You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/03/28 01:05:17 UTC

[40/50] incubator-kylin git commit: refactor

refactor


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/8e6afbf4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/8e6afbf4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/8e6afbf4

Branch: refs/heads/streaming-localdict
Commit: 8e6afbf44d5d4a96b0e55cf8c617fbcdb21a582e
Parents: 12920dc
Author: qianhao.zhou <qi...@ebay.com>
Authored: Fri Mar 27 16:50:50 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Fri Mar 27 16:50:50 2015 +0800

----------------------------------------------------------------------
 .../invertedindex/ToyIIStreamBuilder.java       |  36 ------
 .../kylin/streaming/JsonStreamParser.java       |  12 +-
 .../apache/kylin/streaming/StreamParser.java    |   4 +-
 .../kylin/streaming/StringStreamParser.java     |   2 +-
 .../kylin/streaming/cube/CubeStreamBuilder.java |   2 +-
 .../invertedindex/IIStreamBuilder.java          | 120 +++++-------------
 .../streaming/invertedindex/SliceBuilder.java   | 126 +++++++++++++++++++
 .../invertedindex/PrintOutStreamBuilder.java    |   5 +-
 8 files changed, 167 insertions(+), 140 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8e6afbf4/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/ToyIIStreamBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/ToyIIStreamBuilder.java b/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/ToyIIStreamBuilder.java
deleted file mode 100644
index 3e2a892..0000000
--- a/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/ToyIIStreamBuilder.java
+++ /dev/null
@@ -1,36 +0,0 @@
-package org.apache.kylin.job.hadoop.invertedindex;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-
-import org.apache.kylin.invertedindex.index.Slice;
-import org.apache.kylin.invertedindex.index.TableRecordInfo;
-import org.apache.kylin.invertedindex.model.IIDesc;
-import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
-import org.apache.kylin.invertedindex.model.IIRow;
-import org.apache.kylin.streaming.Stream;
-import org.apache.kylin.streaming.invertedindex.IIStreamBuilder;
-
-/**
- * Created by Hongbin Ma(Binmahone) on 3/26/15.
- *
- * A IIStreamBuilder that can hold all the built slices in form of IIRow
- * This is only for test use
- */
-public class ToyIIStreamBuilder extends IIStreamBuilder {
-    private List<IIRow> result;
-
-    public ToyIIStreamBuilder(BlockingQueue<Stream> queue, IIDesc desc, int partitionId, List<IIRow> result) {
-        super(queue, null, desc, partitionId);
-        this.result = result;
-    }
-
-    protected void outputSlice(Slice slice, TableRecordInfo tableRecordInfo) throws IOException {
-        IIKeyValueCodec codec = new IIKeyValueCodec(tableRecordInfo.getDigest());
-        for (IIRow iiRow : codec.encodeKeyValue(slice)) {
-            result.add(iiRow);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8e6afbf4/streaming/src/main/java/org/apache/kylin/streaming/JsonStreamParser.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/JsonStreamParser.java b/streaming/src/main/java/org/apache/kylin/streaming/JsonStreamParser.java
index 5c8b49d..2912aa7 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/JsonStreamParser.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/JsonStreamParser.java
@@ -50,15 +50,17 @@ import java.util.Map;
  */
 public final class JsonStreamParser implements StreamParser {
 
-    public static final JsonStreamParser instance = new JsonStreamParser();
+    private final List<TblColRef> allColumns;
 
-    private final JsonParser jsonParser = new JsonParser();
+    private static final JsonParser JSON_PARSER = new JsonParser();
 
-    private JsonStreamParser(){}
+    public JsonStreamParser(List<TblColRef> allColumns){
+        this.allColumns = allColumns;
+    }
 
     @Override
-    public List<String> parse(Stream stream, List<TblColRef> allColumns) {
-        final JsonObject root = jsonParser.parse(new String(stream.getRawData())).getAsJsonObject();
+    public List<String> parse(Stream stream) {
+        final JsonObject root = JSON_PARSER.parse(new String(stream.getRawData())).getAsJsonObject();
         ArrayList<String> result = Lists.newArrayList();
 
         for (TblColRef column : allColumns) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8e6afbf4/streaming/src/main/java/org/apache/kylin/streaming/StreamParser.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamParser.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamParser.java
index 9b41c95..c6b23ff 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamParser.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamParser.java
@@ -34,8 +34,6 @@
 
 package org.apache.kylin.streaming;
 
-import org.apache.kylin.metadata.model.TblColRef;
-
 import java.util.List;
 
 /**
@@ -43,5 +41,5 @@ import java.util.List;
  */
 public interface StreamParser {
 
-    List<String> parse(Stream stream, List<TblColRef> allColumns);
+    List<String> parse(Stream stream);
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8e6afbf4/streaming/src/main/java/org/apache/kylin/streaming/StringStreamParser.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StringStreamParser.java b/streaming/src/main/java/org/apache/kylin/streaming/StringStreamParser.java
index 3c62a3a..4fb26fa 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StringStreamParser.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StringStreamParser.java
@@ -49,7 +49,7 @@ public final class StringStreamParser implements StreamParser {
 
     private StringStreamParser(){}
     @Override
-    public List<String> parse(Stream stream, List<TblColRef> allColumns) {
+    public List<String> parse(Stream stream) {
         return Lists.newArrayList(new String(stream.getRawData()).split(","));
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8e6afbf4/streaming/src/main/java/org/apache/kylin/streaming/cube/CubeStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/cube/CubeStreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/cube/CubeStreamBuilder.java
index 5c2efdc..ba3f495 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/cube/CubeStreamBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/cube/CubeStreamBuilder.java
@@ -403,7 +403,7 @@ public class CubeStreamBuilder extends StreamBuilder {
     }
 
     private List<String> parseStream(Stream stream, CubeDesc desc) {
-        return getStreamParser().parse(stream, Lists.newArrayList(desc.listAllColumns()));
+        return getStreamParser().parse(stream);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8e6afbf4/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
index 0cf3c77..72e23ff 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
@@ -34,36 +34,27 @@
 
 package org.apache.kylin.streaming.invertedindex;
 
-import com.google.common.base.Function;
 import com.google.common.base.Stopwatch;
-import com.google.common.collect.Collections2;
-import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.client.HConnectionManager;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.dict.DictionaryGenerator;
-import org.apache.kylin.invertedindex.index.BatchSliceBuilder;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
 import org.apache.kylin.invertedindex.index.Slice;
-import org.apache.kylin.invertedindex.index.TableRecord;
-import org.apache.kylin.invertedindex.index.TableRecordInfo;
 import org.apache.kylin.invertedindex.model.IIDesc;
 import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
 import org.apache.kylin.invertedindex.model.IIRow;
-import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.streaming.Stream;
 import org.apache.kylin.streaming.StreamBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
 
@@ -75,98 +66,36 @@ public class IIStreamBuilder extends StreamBuilder {
     private static Logger logger = LoggerFactory.getLogger(IIStreamBuilder.class);
 
     private final IIDesc desc;
+    private final IIInstance ii;
     private final HTableInterface hTable;
-    private final BatchSliceBuilder sliceBuilder;
-
-    public IIStreamBuilder(BlockingQueue<Stream> queue, String hTableName, IIDesc desc, int partitionId) {
-        super(queue, desc.getSliceSize());
-        this.desc = desc;
+    private final SliceBuilder sliceBuilder;
+    private final int partitionId;
+
+    public IIStreamBuilder(BlockingQueue<Stream> queue, String hTableName, IIInstance iiInstance, int partitionId) {
+        super(queue, iiInstance.getDescriptor().getSliceSize());
+        this.ii = iiInstance;
+        this.desc = iiInstance.getDescriptor();
+        this.partitionId = partitionId;
         try {
-            if (hTableName != null) {
-                this.hTable = HConnectionManager.createConnection(HBaseConfiguration.create()).getTable(hTableName);
-            } else {
-                this.hTable = null;
-            }
+            this.hTable = HConnectionManager.createConnection(HBaseConfiguration.create()).getTable(hTableName);
         } catch (IOException e) {
             logger.error("cannot open htable name:" + hTableName, e);
             throw new RuntimeException("cannot open htable name:" + hTableName, e);
         }
-        sliceBuilder = new BatchSliceBuilder(desc, (short) partitionId);
+        sliceBuilder = new SliceBuilder(desc, (short) partitionId);
     }
 
     @Override
     protected void build(List<Stream> streamsToBuild) throws IOException {
         logger.info("stream build start, size:" + streamsToBuild.size());
         Stopwatch stopwatch = new Stopwatch().start();
-        List<List<String>> table = Lists.transform(streamsToBuild, new Function<Stream, List<String>>() {
-            @Nullable
-            @Override
-            public List<String> apply(@Nullable Stream input) {
-                return parseStream(input, desc);
-            }
-        });
-        final Map<Integer, Dictionary<?>> dictionaryMap = buildDictionary(table, desc);
-        TableRecordInfo tableRecordInfo = new TableRecordInfo(desc, dictionaryMap);
-        final Slice slice = buildSlice(table, sliceBuilder, tableRecordInfo, dictionaryMap);
+        final Slice slice = sliceBuilder.buildSlice(streamsToBuild, getStreamParser());
         logger.info("slice info, shard:" + slice.getShard() + " timestamp:" + slice.getTimestamp() + " record count:" + slice.getRecordCount());
 
-        outputSlice(slice, tableRecordInfo);
-        submitOffset();
-
+        loadToHBase(hTable, slice, new IIKeyValueCodec(slice.getInfo()));
+        submitOffset(0);
         stopwatch.stop();
-        logger.info("stream build finished, size:" + streamsToBuild.size() + " elapsed time:" + stopwatch.elapsedTime(TimeUnit.MILLISECONDS) + TimeUnit.MILLISECONDS);
-    }
-
-    protected void outputSlice(Slice slice, TableRecordInfo tableRecordInfo) throws IOException {
-        loadToHBase(hTable, slice, new IIKeyValueCodec(tableRecordInfo.getDigest()));
-    }
-
-    private Map<Integer, Dictionary<?>> buildDictionary(List<List<String>> table, IIDesc desc) {
-        HashMultimap<TblColRef, String> valueMap = HashMultimap.create();
-        final List<TblColRef> allColumns = desc.listAllColumns();
-        for (List<String> row : table) {
-            for (int i = 0; i < row.size(); i++) {
-                String cell = row.get(i);
-                if (!desc.isMetricsCol(i)) {
-                    valueMap.put(allColumns.get(i), cell);
-                }
-            }
-        }
-
-        Map<Integer, Dictionary<?>> result = Maps.newHashMap();
-        for (TblColRef tblColRef : valueMap.keySet()) {
-            result.put(desc.findColumn(tblColRef), //
-                    DictionaryGenerator.buildDictionaryFromValueList(//
-                            tblColRef.getType(), //
-                            Collections2.transform(valueMap.get(tblColRef), new Function<String, byte[]>() {
-                                @Nullable
-                                @Override
-                                public byte[] apply(String input) {
-                                    return input.getBytes();
-                                }
-                            })));
-        }
-        return result;
-    }
-
-    private List<String> parseStream(Stream stream, IIDesc desc) {
-        return getStreamParser().parse(stream, desc.listAllColumns());
-    }
-
-    private Slice buildSlice(List<List<String>> table, BatchSliceBuilder sliceBuilder, final TableRecordInfo tableRecordInfo, Map<Integer, Dictionary<?>> localDictionary) {
-        final Slice slice = sliceBuilder.build(tableRecordInfo.getDigest(), Lists.transform(table, new Function<List<String>, TableRecord>() {
-            @Nullable
-            @Override
-            public TableRecord apply(@Nullable List<String> input) {
-                TableRecord result = tableRecordInfo.createTableRecord();
-                for (int i = 0; i < input.size(); i++) {
-                    result.setValueString(i, input.get(i));
-                }
-                return result;
-            }
-        }));
-        slice.setLocalDictionaries(localDictionary);
-        return slice;
+        logger.info("stream build finished, size:" + streamsToBuild.size() + " elapsed time:" + stopwatch.elapsedTime(TimeUnit.MILLISECONDS) + " " + TimeUnit.MILLISECONDS);
     }
 
     private void loadToHBase(HTableInterface hTable, Slice slice, IIKeyValueCodec codec) throws IOException {
@@ -192,8 +121,17 @@ public class IIStreamBuilder extends StreamBuilder {
         }
     }
 
-    private void submitOffset() {
-
+    private void submitOffset(long offset) {
+        final IIManager iiManager = IIManager.getInstance(KylinConfig.getInstanceFromEnv());
+        final IIInstance instance = iiManager.getII(ii.getName());
+        instance.getStreamOffsets().set(partitionId, offset);
+        try {
+            iiManager.updateII(instance);
+            logger.info("submit offset");
+        } catch (IOException e) {
+            logger.error("error submit offset: + " + offset, e);
+            throw new RuntimeException(e);
+        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8e6afbf4/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/SliceBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/SliceBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/SliceBuilder.java
new file mode 100644
index 0000000..ac2ce0f
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/SliceBuilder.java
@@ -0,0 +1,126 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming.invertedindex;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.dict.DictionaryGenerator;
+import org.apache.kylin.invertedindex.index.BatchSliceBuilder;
+import org.apache.kylin.invertedindex.index.Slice;
+import org.apache.kylin.invertedindex.index.TableRecord;
+import org.apache.kylin.invertedindex.index.TableRecordInfo;
+import org.apache.kylin.invertedindex.model.IIDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.streaming.Stream;
+import org.apache.kylin.streaming.StreamParser;
+
+import javax.annotation.Nullable;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created by qianzhou on 3/27/15.
+ */
+public final class SliceBuilder {
+
+    public SliceBuilder(IIDesc desc, short shard){
+        this.iiDesc = desc;
+        this.sliceBuilder = new BatchSliceBuilder(desc, shard);
+    }
+
+    private final BatchSliceBuilder sliceBuilder;
+    private final IIDesc iiDesc;
+
+    public Slice buildSlice(List<Stream> streams, final StreamParser streamParser) {
+        List<List<String>> table = Lists.transform(streams, new Function<Stream, List<String>>() {
+            @Nullable
+            @Override
+            public List<String> apply(@Nullable Stream input) {
+                return streamParser.parse(input);
+            }
+        });
+        final Map<Integer, Dictionary<?>> dictionaryMap = buildDictionary(table, iiDesc);
+        TableRecordInfo tableRecordInfo = new TableRecordInfo(iiDesc, dictionaryMap);
+        return build(table, sliceBuilder, tableRecordInfo, dictionaryMap);
+    }
+
+    private Map<Integer, Dictionary<?>> buildDictionary(List<List<String>> table, IIDesc desc) {
+        HashMultimap<TblColRef, String> valueMap = HashMultimap.create();
+        final List<TblColRef> allColumns = desc.listAllColumns();
+        for (List<String> row : table) {
+            for (int i = 0; i < row.size(); i++) {
+                String cell = row.get(i);
+                if (!desc.isMetricsCol(i)) {
+                    valueMap.put(allColumns.get(i), cell);
+                }
+            }
+        }
+
+        Map<Integer, Dictionary<?>> result = Maps.newHashMap();
+        for (TblColRef tblColRef : valueMap.keySet()) {
+            final Collection<byte[]> bytes = Collections2.transform(valueMap.get(tblColRef), new Function<String, byte[]>() {
+                @Nullable
+                @Override
+                public byte[] apply(String input) {
+                    return input.getBytes();
+                }
+            });
+            final Dictionary<?> dict = DictionaryGenerator.buildDictionaryFromValueList(tblColRef.getType(), bytes);
+            result.put(desc.findColumn(tblColRef), dict);
+        }
+        return result;
+    }
+
+    private Slice build(List<List<String>> table, BatchSliceBuilder sliceBuilder, final TableRecordInfo tableRecordInfo, Map<Integer, Dictionary<?>> localDictionary) {
+        final Slice slice = sliceBuilder.build(tableRecordInfo.getDigest(), Lists.transform(table, new Function<List<String>, TableRecord>() {
+            @Nullable
+            @Override
+            public TableRecord apply(@Nullable List<String> input) {
+                TableRecord result = tableRecordInfo.createTableRecord();
+                for (int i = 0; i < input.size(); i++) {
+                    result.setValueString(i, input.get(i));
+                }
+                return result;
+            }
+        }));
+        slice.setLocalDictionaries(localDictionary);
+        return slice;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8e6afbf4/streaming/src/test/java/org/apache/kylin/streaming/invertedindex/PrintOutStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/invertedindex/PrintOutStreamBuilder.java b/streaming/src/test/java/org/apache/kylin/streaming/invertedindex/PrintOutStreamBuilder.java
index e83bdc5..e5873c0 100644
--- a/streaming/src/test/java/org/apache/kylin/streaming/invertedindex/PrintOutStreamBuilder.java
+++ b/streaming/src/test/java/org/apache/kylin/streaming/invertedindex/PrintOutStreamBuilder.java
@@ -40,7 +40,6 @@ import org.apache.kylin.streaming.JsonStreamParser;
 import org.apache.kylin.streaming.Stream;
 import org.apache.kylin.streaming.StreamBuilder;
 
-import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
 
@@ -53,14 +52,14 @@ public class PrintOutStreamBuilder extends StreamBuilder {
 
     public PrintOutStreamBuilder(BlockingQueue<Stream> streamQueue, int sliceSize, List<TblColRef> allColumns) {
         super(streamQueue, sliceSize);
-        setStreamParser(JsonStreamParser.instance);
+        setStreamParser(new JsonStreamParser(allColumns));
         this.allColumns = allColumns;
     }
 
     @Override
     protected void build(List<Stream> streamsToBuild) throws Exception {
         for (Stream stream : streamsToBuild) {
-            final List<String> row = getStreamParser().parse(stream, allColumns);
+            final List<String> row = getStreamParser().parse(stream);
             System.out.println("offset:" + stream.getOffset() + " " + StringUtils.join(row, ","));
         }
     }