You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by qh...@apache.org on 2015/05/28 12:51:05 UTC

incubator-kylin git commit: logger

Repository: incubator-kylin
Updated Branches:
  refs/heads/streaming-cubing 838c8717e -> 1ecf68185


logger


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/1ecf6818
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/1ecf6818
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/1ecf6818

Branch: refs/heads/streaming-cubing
Commit: 1ecf68185f220b88e9c489ef29fce72968be69fb
Parents: 838c871
Author: qianhao.zhou <qi...@ebay.com>
Authored: Thu May 28 18:51:00 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Thu May 28 18:51:00 2015 +0800

----------------------------------------------------------------------
 .../kylin/job/streaming/CubeStreamBuilder.java  | 58 +++++++++++---------
 1 file changed, 31 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1ecf6818/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
index 846ade5..a2a60fa 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
@@ -9,6 +9,7 @@ import com.google.common.hash.HashFunction;
 import com.google.common.hash.Hasher;
 import com.google.common.hash.Hashing;
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -88,33 +89,36 @@ public class CubeStreamBuilder extends StreamBuilder {
             return;
         }
         final List<List<String>> parsedStreamMessages = parseStream(streamMessages);
-        long startOffset = streamMessages.get(0).getOffset();
-        long endOffset = streamMessages.get(streamMessages.size() - 1).getOffset();
-        LinkedBlockingQueue<List<String>> blockingQueue = new LinkedBlockingQueue<List<String>>(parsedStreamMessages);
-        blockingQueue.put(Collections.<String>emptyList());
-
-        final CubeInstance cubeInstance = cubeManager.getCube(cubeName);
-        final CubeDesc cubeDesc = cubeInstance.getDescriptor();
-        final CubeSegment cubeSegment = cubeManager.appendSegments(cubeManager.getCube(cubeName), System.currentTimeMillis());
-        final Map<Long, HyperLogLogPlusCounter> samplingResult = sampling(cubeInstance.getDescriptor(), parsedStreamMessages);
-
-        final Configuration conf = HadoopUtil.getCurrentConfiguration();
-        final String outputPath = "/tmp/kylin/cuboidstatistics/" + UUID.randomUUID().toString();
-        FactDistinctColumnsReducer.writeCuboidStatistics(conf, outputPath, samplingResult, 100);
-        ResourceStore.getStore(kylinConfig).putResource(cubeSegment.getStatisticsResourcePath(), FileSystem.get(conf).open(new Path(outputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION)), 0);
-
-        final Map<TblColRef, Dictionary<?>> dictionaryMap = buildDictionary(getTblColRefMap(cubeInstance), parsedStreamMessages);
-        writeDictionary(cubeSegment, dictionaryMap, startOffset, endOffset);
-
-        final HTableInterface hTable = createHTable(cubeSegment);
-
-        final CubeStreamRecordWriter gtRecordWriter = new CubeStreamRecordWriter(cubeDesc, hTable);
-        InMemCubeBuilder inMemCubeBuilder = new InMemCubeBuilder(blockingQueue, cubeInstance,
-                dictionaryMap, gtRecordWriter);
-
-        executorService.submit(inMemCubeBuilder).get();
-        gtRecordWriter.flush();
-        commitSegment(cubeSegment);
+        for (List<String> parsedStreamMessage : parsedStreamMessages) {
+            logger.info(StringUtils.join(parsedStreamMessage, ","));
+        }
+//        long startOffset = streamMessages.get(0).getOffset();
+//        long endOffset = streamMessages.get(streamMessages.size() - 1).getOffset();
+//        LinkedBlockingQueue<List<String>> blockingQueue = new LinkedBlockingQueue<List<String>>(parsedStreamMessages);
+//        blockingQueue.put(Collections.<String>emptyList());
+//
+//        final CubeInstance cubeInstance = cubeManager.getCube(cubeName);
+//        final CubeDesc cubeDesc = cubeInstance.getDescriptor();
+//        final CubeSegment cubeSegment = cubeManager.appendSegments(cubeManager.getCube(cubeName), System.currentTimeMillis());
+//        final Map<Long, HyperLogLogPlusCounter> samplingResult = sampling(cubeInstance.getDescriptor(), parsedStreamMessages);
+//
+//        final Configuration conf = HadoopUtil.getCurrentConfiguration();
+//        final String outputPath = "/tmp/kylin/cuboidstatistics/" + UUID.randomUUID().toString();
+//        FactDistinctColumnsReducer.writeCuboidStatistics(conf, outputPath, samplingResult, 100);
+//        ResourceStore.getStore(kylinConfig).putResource(cubeSegment.getStatisticsResourcePath(), FileSystem.get(conf).open(new Path(outputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION)), 0);
+//
+//        final Map<TblColRef, Dictionary<?>> dictionaryMap = buildDictionary(getTblColRefMap(cubeInstance), parsedStreamMessages);
+//        writeDictionary(cubeSegment, dictionaryMap, startOffset, endOffset);
+//
+//        final HTableInterface hTable = createHTable(cubeSegment);
+//
+//        final CubeStreamRecordWriter gtRecordWriter = new CubeStreamRecordWriter(cubeDesc, hTable);
+//        InMemCubeBuilder inMemCubeBuilder = new InMemCubeBuilder(blockingQueue, cubeInstance,
+//                dictionaryMap, gtRecordWriter);
+//
+//        executorService.submit(inMemCubeBuilder).get();
+//        gtRecordWriter.flush();
+//        commitSegment(cubeSegment);
     }
 
     private void writeDictionary(CubeSegment cubeSegment, Map<TblColRef, Dictionary<?>> dictionaryMap, long startOffset, long endOffset) {