You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/03/10 08:50:41 UTC
[38/50] [abbrv] kylin git commit: KYLIN-1387 Streaming cubing doesn't
generate cuboids files on HDFS, cause cube merge failure
KYLIN-1387 Streaming cubing doesn't generate cuboids files on HDFS, cause cube merge failure
Conflicts:
engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/929c7a49
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/929c7a49
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/929c7a49
Branch: refs/heads/master
Commit: 929c7a4908a3cd655ab31e71eb6453971f3acd36
Parents: 250978d
Author: shaofengshi <sh...@apache.org>
Authored: Tue Feb 2 17:34:46 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Tue Mar 8 12:19:36 2016 +0800
----------------------------------------------------------------------
.../cube/inmemcubing/CompoundCuboidWriter.java | 57 ++++++++++++++
.../kylin/cube/inmemcubing/ICuboidWriter.java | 4 +-
.../kylin/job/constant/ExecutableConstants.java | 1 +
.../kylin/engine/mr/steps/KVGTRecordWriter.java | 81 ++++++++++++++++++++
.../mr/steps/MapContextGTRecordWriter.java | 68 ++--------------
.../streaming/cube/StreamingCubeBuilder.java | 12 ++-
.../storage/hbase/steps/HBaseCuboidWriter.java | 24 +++---
.../hbase/steps/HBaseMROutput2Transition.java | 2 +-
.../kylin/storage/hbase/steps/HBaseMRSteps.java | 2 +-
.../hbase/steps/HBaseStreamingOutput.java | 8 +-
.../hbase/steps/SequenceFileCuboidWriter.java | 75 ++++++++++++++++++
11 files changed, 254 insertions(+), 80 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/929c7a49/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/CompoundCuboidWriter.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/CompoundCuboidWriter.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/CompoundCuboidWriter.java
new file mode 100644
index 0000000..46eef50
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/CompoundCuboidWriter.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cube.inmemcubing;
+
+import org.apache.kylin.gridtable.GTRecord;
+
+import java.io.IOException;
+
+/**
+ */
+public class CompoundCuboidWriter implements ICuboidWriter {
+
+ private Iterable<ICuboidWriter> cuboidWriters;
+
+ public CompoundCuboidWriter(Iterable<ICuboidWriter> cuboidWriters) {
+ this.cuboidWriters = cuboidWriters;
+
+ }
+
+ @Override
+ public void write(long cuboidId, GTRecord record) throws IOException {
+ for (ICuboidWriter writer : cuboidWriters) {
+ writer.write(cuboidId, record);
+ }
+ }
+
+ @Override
+ public void flush() throws IOException {
+ for (ICuboidWriter writer : cuboidWriters) {
+ writer.flush();
+ }
+
+ }
+
+ @Override
+ public void close() throws IOException {
+ for (ICuboidWriter writer : cuboidWriters) {
+ writer.close();
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/929c7a49/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidWriter.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidWriter.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidWriter.java
index 9e26e5e..e6cfa02 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidWriter.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidWriter.java
@@ -27,7 +27,7 @@ public interface ICuboidWriter {
void write(long cuboidId, GTRecord record) throws IOException;
- void flush();
+ void flush() throws IOException;
- void close();
+ void close() throws IOException;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/929c7a49/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
index ba50880..d370b0d 100644
--- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
+++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
@@ -56,6 +56,7 @@ public final class ExecutableConstants {
public static final String STEP_NAME_MERGE_CUBOID = "Merge Cuboid Data";
public static final String STEP_NAME_UPDATE_CUBE_INFO = "Update Cube Info";
public static final String STEP_NAME_GARBAGE_COLLECTION = "Garbage Collection";
+ public static final String STEP_NAME_GARBAGE_COLLECTION_HDFS = "Garbage Collection on HDFS";
public static final String STEP_NAME_BUILD_II = "Build Inverted Index";
public static final String STEP_NAME_CONVERT_II_TO_HFILE = "Convert Inverted Index Data to HFile";
http://git-wip-us.apache.org/repos/asf/kylin/blob/929c7a49/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KVGTRecordWriter.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KVGTRecordWriter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KVGTRecordWriter.java
new file mode 100644
index 0000000..e201705
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KVGTRecordWriter.java
@@ -0,0 +1,81 @@
+package org.apache.kylin.engine.mr.steps;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
+import org.apache.kylin.cube.kv.AbstractRowKeyEncoder;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.ByteArrayWritable;
+import org.apache.kylin.gridtable.GTRecord;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ */
+public abstract class KVGTRecordWriter implements ICuboidWriter {
+
+ private static final Log logger = LogFactory.getLog(KVGTRecordWriter.class);
+ private Long lastCuboidId;
+ protected CubeSegment cubeSegment;
+ protected CubeDesc cubeDesc;
+
+ private AbstractRowKeyEncoder rowKeyEncoder;
+ private int dimensions;
+ private int measureCount;
+ private byte[] keyBuf;
+ private int[] measureColumnsIndex;
+ private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
+ private ByteArrayWritable outputKey = new ByteArrayWritable();
+ private ByteArrayWritable outputValue = new ByteArrayWritable();
+ private long cuboidRowCount = 0;
+
+ //for shard
+
+ public KVGTRecordWriter(CubeDesc cubeDesc, CubeSegment cubeSegment) {
+ this.cubeDesc = cubeDesc;
+ this.cubeSegment = cubeSegment;
+ this.measureCount = cubeDesc.getMeasures().size();
+ }
+
+ @Override
+ public void write(long cuboidId, GTRecord record) throws IOException {
+
+ if (lastCuboidId == null || !lastCuboidId.equals(cuboidId)) {
+ if (lastCuboidId != null) {
+ logger.info("Cuboid " + lastCuboidId + " has " + cuboidRowCount + " rows");
+ cuboidRowCount = 0;
+ }
+ // output another cuboid
+ initVariables(cuboidId);
+ lastCuboidId = cuboidId;
+ }
+
+ cuboidRowCount++;
+ rowKeyEncoder.encode(record, record.getInfo().getPrimaryKey(), keyBuf);
+
+ //output measures
+ valueBuf.clear();
+ record.exportColumns(measureColumnsIndex, valueBuf);
+
+ outputKey.set(keyBuf, 0, keyBuf.length);
+ outputValue.set(valueBuf.array(), 0, valueBuf.position());
+ writeAsKeyValue(outputKey, outputValue);
+ }
+
+ protected abstract void writeAsKeyValue(ByteArrayWritable key, ByteArrayWritable value) throws IOException;
+
+ private void initVariables(Long cuboidId) {
+ rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cubeSegment, Cuboid.findById(cubeDesc, cuboidId));
+ keyBuf = rowKeyEncoder.createBuf();
+
+ dimensions = Long.bitCount(cuboidId);
+ measureColumnsIndex = new int[measureCount];
+ for (int i = 0; i < measureCount; i++) {
+ measureColumnsIndex[i] = dimensions + i;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/929c7a49/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
index 8416d95..6b4d07d 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
@@ -1,76 +1,32 @@
package org.apache.kylin.engine.mr.steps;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.BitSet;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapreduce.MapContext;
import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
-import org.apache.kylin.cube.kv.AbstractRowKeyEncoder;
-import org.apache.kylin.cube.kv.RowConstants;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.engine.mr.ByteArrayWritable;
-import org.apache.kylin.gridtable.GTRecord;
+
+import java.io.IOException;
/**
*/
-public class MapContextGTRecordWriter implements ICuboidWriter {
+public class MapContextGTRecordWriter extends KVGTRecordWriter {
private static final Log logger = LogFactory.getLog(MapContextGTRecordWriter.class);
protected MapContext<?, ?, ByteArrayWritable, ByteArrayWritable> mapContext;
- private Long lastCuboidId;
- protected CubeSegment cubeSegment;
- protected CubeDesc cubeDesc;
-
- private AbstractRowKeyEncoder rowKeyEncoder;
- private int dimensions;
- private int measureCount;
- private byte[] keyBuf;
- private int[] measureColumnsIndex;
- private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
- private ByteArrayWritable outputKey = new ByteArrayWritable();
- private ByteArrayWritable outputValue = new ByteArrayWritable();
- private long cuboidRowCount = 0;
-
- //for shard
public MapContextGTRecordWriter(MapContext<?, ?, ByteArrayWritable, ByteArrayWritable> mapContext, CubeDesc cubeDesc, CubeSegment cubeSegment) {
+ super(cubeDesc, cubeSegment);
this.mapContext = mapContext;
- this.cubeDesc = cubeDesc;
- this.cubeSegment = cubeSegment;
- this.measureCount = cubeDesc.getMeasures().size();
}
@Override
- public void write(long cuboidId, GTRecord record) throws IOException {
-
- if (lastCuboidId == null || !lastCuboidId.equals(cuboidId)) {
- if (lastCuboidId != null) {
- logger.info("Cuboid " + lastCuboidId + " has " + cuboidRowCount + " rows");
- cuboidRowCount = 0;
- }
- // output another cuboid
- initVariables(cuboidId);
- lastCuboidId = cuboidId;
- }
-
- cuboidRowCount++;
- rowKeyEncoder.encode(record, record.getInfo().getPrimaryKey(), keyBuf);
-
- //output measures
- valueBuf.clear();
- record.exportColumns(measureColumnsIndex, valueBuf);
-
- outputKey.set(keyBuf, 0, keyBuf.length);
- outputValue.set(valueBuf.array(), 0, valueBuf.position());
+ protected void writeAsKeyValue(ByteArrayWritable key, ByteArrayWritable value) throws IOException {
try {
- mapContext.write(outputKey, outputValue);
+ mapContext.write(key, value);
} catch (InterruptedException e) {
- throw new RuntimeException(e);
+ throw new IOException(e);
}
}
@@ -84,14 +40,4 @@ public class MapContextGTRecordWriter implements ICuboidWriter {
}
- private void initVariables(Long cuboidId) {
- rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cubeSegment, Cuboid.findById(cubeDesc, cuboidId));
- keyBuf = rowKeyEncoder.createBuf();
-
- dimensions = Long.bitCount(cuboidId);
- measureColumnsIndex = new int[measureCount];
- for (int i = 0; i < measureCount; i++) {
- measureColumnsIndex[i] = dimensions + i;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/929c7a49/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
index c4f2b7e..ec2ad91 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
@@ -99,6 +99,14 @@ public class StreamingCubeBuilder implements StreamingBatchBuilder {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException("error build cube from StreamingBatch", e.getCause());
+ } catch (IOException e) {
+ throw new RuntimeException("error build cube from StreamingBatch", e.getCause());
+ } finally {
+ try {
+ cuboidWriter.close();
+ } catch (IOException e) {
+ throw new RuntimeException("error build cube from StreamingBatch", e.getCause());
+ }
}
}
@@ -107,7 +115,9 @@ public class StreamingCubeBuilder implements StreamingBatchBuilder {
CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
final CubeInstance cubeInstance = cubeManager.reloadCubeLocal(cubeName);
try {
- return cubeManager.appendSegments(cubeInstance, streamingBatch.getTimeRange().getFirst(), streamingBatch.getTimeRange().getSecond(), false, false);
+ CubeSegment segment = cubeManager.appendSegments(cubeInstance, streamingBatch.getTimeRange().getFirst(), streamingBatch.getTimeRange().getSecond(), false, false);
+ segment.setLastBuildJobID(segment.getUuid()); // give a fake job id
+ return segment;
} catch (IOException e) {
throw new RuntimeException("failed to create IBuildable", e);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/929c7a49/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java
index c4dc0b5..ddc868d 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java
@@ -33,9 +33,8 @@
*/
package org.apache.kylin.storage.hbase.steps;
-import java.io.IOException;
-import java.util.List;
-
+import com.google.common.collect.Lists;
+import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
@@ -51,13 +50,14 @@ import org.apache.kylin.gridtable.GTRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.List;
/**
*/
-public final class HBaseCuboidWriter implements ICuboidWriter {
+public class HBaseCuboidWriter implements ICuboidWriter {
- private static final Logger logger = LoggerFactory.getLogger(HBaseStreamingOutput.class);
+ private static final Logger logger = LoggerFactory.getLogger(HBaseCuboidWriter.class);
private static final int BATCH_PUT_THRESHOLD = 10000;
@@ -125,8 +125,8 @@ public final class HBaseCuboidWriter implements ICuboidWriter {
}
}
- public final void flush() {
- try {
+ @Override
+ public final void flush() throws IOException {
if (!puts.isEmpty()) {
long t = System.currentTimeMillis();
if (hTable != null) {
@@ -136,14 +136,12 @@ public final class HBaseCuboidWriter implements ICuboidWriter {
logger.info("commit total " + puts.size() + " puts, totally cost:" + (System.currentTimeMillis() - t) + "ms");
puts.clear();
}
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
}
@Override
- public void close() {
-
+ public void close() throws IOException {
+ flush();
+ IOUtils.closeQuietly(hTable);
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/929c7a49/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
index 4c2737d..7bb3647 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
@@ -80,7 +80,7 @@ public class HBaseMROutput2Transition implements IMROutput2 {
@Override
public void addStepPhase3_Cleanup(DefaultChainedExecutable jobFlow) {
- jobFlow.addTask(steps.createMergeGCStep());
+ steps.addMergingGarbageCollectionSteps(jobFlow);
}
};
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/929c7a49/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
index 2a21640..a828728 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
@@ -161,7 +161,7 @@ public class HBaseMRSteps extends JobBuilderSupport {
toDeletePaths.addAll(getMergingHDFSPaths());
HDFSPathGarbageCollectionStep step = new HDFSPathGarbageCollectionStep();
- step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION);
+ step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION_HDFS);
step.setDeletePaths(toDeletePaths);
step.setJobId(jobId);
http://git-wip-us.apache.org/repos/asf/kylin/blob/929c7a49/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java
index 770be3c..4cc4794 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java
@@ -18,9 +18,11 @@
package org.apache.kylin.storage.hbase.steps;
import java.io.IOException;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
+import com.google.common.collect.Lists;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
@@ -31,6 +33,7 @@ import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.inmemcubing.CompoundCuboidWriter;
import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
import org.apache.kylin.engine.mr.HadoopUtil;
import org.apache.kylin.engine.mr.common.BatchConstants;
@@ -54,7 +57,10 @@ public class HBaseStreamingOutput implements IStreamingOutput {
final HTableInterface hTable;
hTable = createHTable(cubeSegment);
- return new HBaseCuboidWriter(cubeSegment, hTable);
+ List<ICuboidWriter> cuboidWriters = Lists.newArrayList();
+ cuboidWriters.add(new HBaseCuboidWriter(cubeSegment, hTable));
+ cuboidWriters.add(new SequenceFileCuboidWriter(cubeSegment.getCubeDesc(), cubeSegment));
+ return new CompoundCuboidWriter(cuboidWriters);
} catch (IOException e) {
throw new RuntimeException("failed to get ICuboidWriter", e);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/929c7a49/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SequenceFileCuboidWriter.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SequenceFileCuboidWriter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SequenceFileCuboidWriter.java
new file mode 100644
index 0000000..4d76522
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SequenceFileCuboidWriter.java
@@ -0,0 +1,75 @@
+package org.apache.kylin.storage.hbase.steps;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.ByteArrayWritable;
+import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.engine.mr.steps.KVGTRecordWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ */
+public class SequenceFileCuboidWriter extends KVGTRecordWriter {
+
+ private static final Logger logger = LoggerFactory.getLogger(SequenceFileCuboidWriter.class);
+ private SequenceFile.Writer writer = null;
+
+ public SequenceFileCuboidWriter(CubeDesc cubeDesc, CubeSegment segment) {
+ super(cubeDesc, segment);
+ }
+
+
+ @Override
+ protected void writeAsKeyValue(ByteArrayWritable key, ByteArrayWritable value) throws IOException {
+ if (writer == null) {
+ synchronized (SequenceFileCuboidWriter.class) {
+ if (writer == null) {
+ JobBuilderSupport jobBuilderSupport = new JobBuilderSupport(cubeSegment, "SYSTEM");
+ String cuboidRoot = jobBuilderSupport.getCuboidRootPath(cubeSegment);
+ Path cuboidPath = new Path(cuboidRoot);
+ FileSystem fs = HadoopUtil.getFileSystem(cuboidRoot);
+ try {
+ if (fs.exists(cuboidPath)) {
+ fs.delete(cuboidPath, true);
+ }
+
+ fs.mkdirs(cuboidPath);
+ } finally {
+ IOUtils.closeQuietly(fs);
+ }
+
+ Path cuboidFile = new Path(cuboidPath, "data.seq");
+ logger.debug("Cuboid is written to " + cuboidFile);
+ writer = SequenceFile.createWriter(HadoopUtil.getCurrentConfiguration(), SequenceFile.Writer.file(cuboidFile), SequenceFile.Writer.keyClass(Text.class), SequenceFile.Writer.valueClass(Text.class));
+ }
+ }
+ }
+
+ Text outputValue = new Text();
+ Text outputKey = new Text();
+ outputKey.set(key.array(), key.offset(), key.length());
+ outputValue.set(value.array(), value.offset(), value.length());
+ writer.append(outputKey, outputValue);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ if (writer != null) {
+ writer.hflush();
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ IOUtils.closeQuietly(writer);
+ }
+}