You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by qh...@apache.org on 2015/05/28 12:51:05 UTC
incubator-kylin git commit: logger
Repository: incubator-kylin
Updated Branches:
refs/heads/streaming-cubing 838c8717e -> 1ecf68185
logger
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/1ecf6818
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/1ecf6818
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/1ecf6818
Branch: refs/heads/streaming-cubing
Commit: 1ecf68185f220b88e9c489ef29fce72968be69fb
Parents: 838c871
Author: qianhao.zhou <qi...@ebay.com>
Authored: Thu May 28 18:51:00 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Thu May 28 18:51:00 2015 +0800
----------------------------------------------------------------------
.../kylin/job/streaming/CubeStreamBuilder.java | 58 +++++++++++---------
1 file changed, 31 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1ecf6818/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
index 846ade5..a2a60fa 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
@@ -9,6 +9,7 @@ import com.google.common.hash.HashFunction;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -88,33 +89,36 @@ public class CubeStreamBuilder extends StreamBuilder {
return;
}
final List<List<String>> parsedStreamMessages = parseStream(streamMessages);
- long startOffset = streamMessages.get(0).getOffset();
- long endOffset = streamMessages.get(streamMessages.size() - 1).getOffset();
- LinkedBlockingQueue<List<String>> blockingQueue = new LinkedBlockingQueue<List<String>>(parsedStreamMessages);
- blockingQueue.put(Collections.<String>emptyList());
-
- final CubeInstance cubeInstance = cubeManager.getCube(cubeName);
- final CubeDesc cubeDesc = cubeInstance.getDescriptor();
- final CubeSegment cubeSegment = cubeManager.appendSegments(cubeManager.getCube(cubeName), System.currentTimeMillis());
- final Map<Long, HyperLogLogPlusCounter> samplingResult = sampling(cubeInstance.getDescriptor(), parsedStreamMessages);
-
- final Configuration conf = HadoopUtil.getCurrentConfiguration();
- final String outputPath = "/tmp/kylin/cuboidstatistics/" + UUID.randomUUID().toString();
- FactDistinctColumnsReducer.writeCuboidStatistics(conf, outputPath, samplingResult, 100);
- ResourceStore.getStore(kylinConfig).putResource(cubeSegment.getStatisticsResourcePath(), FileSystem.get(conf).open(new Path(outputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION)), 0);
-
- final Map<TblColRef, Dictionary<?>> dictionaryMap = buildDictionary(getTblColRefMap(cubeInstance), parsedStreamMessages);
- writeDictionary(cubeSegment, dictionaryMap, startOffset, endOffset);
-
- final HTableInterface hTable = createHTable(cubeSegment);
-
- final CubeStreamRecordWriter gtRecordWriter = new CubeStreamRecordWriter(cubeDesc, hTable);
- InMemCubeBuilder inMemCubeBuilder = new InMemCubeBuilder(blockingQueue, cubeInstance,
- dictionaryMap, gtRecordWriter);
-
- executorService.submit(inMemCubeBuilder).get();
- gtRecordWriter.flush();
- commitSegment(cubeSegment);
+ for (List<String> parsedStreamMessage : parsedStreamMessages) {
+ logger.info(StringUtils.join(parsedStreamMessage, ","));
+ }
+// long startOffset = streamMessages.get(0).getOffset();
+// long endOffset = streamMessages.get(streamMessages.size() - 1).getOffset();
+// LinkedBlockingQueue<List<String>> blockingQueue = new LinkedBlockingQueue<List<String>>(parsedStreamMessages);
+// blockingQueue.put(Collections.<String>emptyList());
+//
+// final CubeInstance cubeInstance = cubeManager.getCube(cubeName);
+// final CubeDesc cubeDesc = cubeInstance.getDescriptor();
+// final CubeSegment cubeSegment = cubeManager.appendSegments(cubeManager.getCube(cubeName), System.currentTimeMillis());
+// final Map<Long, HyperLogLogPlusCounter> samplingResult = sampling(cubeInstance.getDescriptor(), parsedStreamMessages);
+//
+// final Configuration conf = HadoopUtil.getCurrentConfiguration();
+// final String outputPath = "/tmp/kylin/cuboidstatistics/" + UUID.randomUUID().toString();
+// FactDistinctColumnsReducer.writeCuboidStatistics(conf, outputPath, samplingResult, 100);
+// ResourceStore.getStore(kylinConfig).putResource(cubeSegment.getStatisticsResourcePath(), FileSystem.get(conf).open(new Path(outputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION)), 0);
+//
+// final Map<TblColRef, Dictionary<?>> dictionaryMap = buildDictionary(getTblColRefMap(cubeInstance), parsedStreamMessages);
+// writeDictionary(cubeSegment, dictionaryMap, startOffset, endOffset);
+//
+// final HTableInterface hTable = createHTable(cubeSegment);
+//
+// final CubeStreamRecordWriter gtRecordWriter = new CubeStreamRecordWriter(cubeDesc, hTable);
+// InMemCubeBuilder inMemCubeBuilder = new InMemCubeBuilder(blockingQueue, cubeInstance,
+// dictionaryMap, gtRecordWriter);
+//
+// executorService.submit(inMemCubeBuilder).get();
+// gtRecordWriter.flush();
+// commitSegment(cubeSegment);
}
private void writeDictionary(CubeSegment cubeSegment, Map<TblColRef, Dictionary<?>> dictionaryMap, long startOffset, long endOffset) {