You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/03/21 01:27:49 UTC

[44/50] [abbrv] incubator-kylin git commit: refactor

refactor


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/dda0e333
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/dda0e333
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/dda0e333

Branch: refs/heads/streaming-localdict
Commit: dda0e3337abd39eb70645e8882a616cac116bfc8
Parents: 4a5c9ce
Author: qianhao.zhou <qi...@ebay.com>
Authored: Fri Mar 20 16:55:48 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Fri Mar 20 16:55:48 2015 +0800

----------------------------------------------------------------------
 .../invertedindex/index/BatchSliceBuilder.java  | 100 +++++++++++++++++++
 .../invertedindex/model/IIKeyValueCodec.java    |   2 +-
 .../apache/kylin/job/BuildIIWithStreamTest.java |  22 +++-
 .../invertedindex/IIStreamBuilder.java          |  37 ++++---
 4 files changed, 140 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/dda0e333/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/BatchSliceBuilder.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/BatchSliceBuilder.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/BatchSliceBuilder.java
new file mode 100644
index 0000000..94b70c1
--- /dev/null
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/BatchSliceBuilder.java
@@ -0,0 +1,100 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.invertedindex.index;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.kylin.invertedindex.model.IIDesc;
+
+import java.util.List;
+
+/**
+ * Created by qianzhou on 3/20/15.
+ */
+public class BatchSliceBuilder {
+
+    private final int nColumns;
+    private final int nRecordsCap;
+    private final short shard;
+    private final IIDesc desc;
+
+    private long sliceTimestamp;
+
+    transient ImmutableBytesWritable temp = new ImmutableBytesWritable();
+
+    public BatchSliceBuilder(IIDesc desc, short shard) {
+        this.desc = desc;
+        this.nColumns = desc.listAllColumns().size();
+        this.nRecordsCap = Math.max(1, desc.getSliceSize());
+
+        this.shard = shard;
+        this.sliceTimestamp = Long.MIN_VALUE;
+    }
+
+    public Slice build(TableRecordInfoDigest digest, List<TableRecord> records) {
+        Preconditions.checkArgument(records != null && !records.isEmpty(), "records cannot be empty");
+        Preconditions.checkArgument(records.size() <= nRecordsCap, "batch count cannot exceed " + nRecordsCap);
+        sliceTimestamp = increaseSliceTimestamp(records.get(0).getTimestamp());
+        ColumnValueContainer[] containers = new ColumnValueContainer[nColumns];
+        for (int i : desc.getValueColumns()) {
+            containers[i] = new CompressedValueContainer(digest, i, nRecordsCap);
+        }
+        for (int i : desc.getMetricsColumns()) {
+            containers[i] = new CompressedValueContainer(digest, i, nRecordsCap);
+        }
+        for (TableRecord record : records) {
+            Preconditions.checkArgument(record.getShard() == shard, "record shard:" + record.getShard() + " but should be " + shard);
+            for (int i = 0; i < nColumns; i++) {
+                record.getValueBytes(i, temp);
+                containers[i].append(temp);
+            }
+        }
+        return new Slice(digest, shard, sliceTimestamp, containers);
+
+    }
+
+    private long increaseSliceTimestamp(long timestamp) {
+        if (timestamp < sliceTimestamp) {
+            throw new IllegalStateException();
+        }
+
+        if (timestamp == sliceTimestamp) {
+            return ++timestamp; // ensure slice timestamp increases
+        } else {
+            return timestamp;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/dda0e333/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
index ee26181..eedda4b 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
@@ -181,7 +181,7 @@ public class IIKeyValueCodec implements KeyValueCodec {
                     short lastShard = 0;
                     long lastTimestamp = 0;
 
-                    while (iterator().hasNext() && columns < incompleteDigest.getColumnCount()) {
+                    while (iterator.hasNext() && columns < incompleteDigest.getColumnCount()) {
                         final IIRow row = iterator.next();
                         final ImmutableBytesWritable key = row.getKey();
                         int i = key.getOffset();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/dda0e333/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java b/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
index 8c8d580..652fca0 100644
--- a/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
+++ b/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
@@ -34,6 +34,7 @@
 
 package org.apache.kylin.job;
 
+import com.google.common.collect.Lists;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.ToolRunner;
@@ -68,9 +69,7 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 import java.text.SimpleDateFormat;
-import java.util.List;
-import java.util.TimeZone;
-import java.util.UUID;
+import java.util.*;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -216,9 +215,24 @@ public class BuildIIWithStreamTest {
 
         ExecutorService executorService = Executors.newSingleThreadExecutor();
         final IIStreamBuilder streamBuilder = new IIStreamBuilder(queue, segment.getStorageLocationIdentifier(), desc, 0);
+        int count = 0;
+        List<String[]> rawData = Lists.newArrayList();
         while (reader.next()) {
-            queue.put(parse(reader.getRow()));
+            desc.getTimestampColumn();
+            rawData.add(reader.getRow());
+            count++;
         }
+        final int timestampColumn = desc.getTimestampColumn();
+        Collections.sort(rawData, new Comparator<String[]>() {
+            @Override
+            public int compare(String[] o1, String[] o2) {
+                return o1[timestampColumn].compareTo(o2[timestampColumn]);
+            }
+        });
+        for (String[] row : rawData) {
+            queue.put(parse(row));
+        }
+        logger.info("total record count:" + count + " htable:" + segment.getStorageLocationIdentifier());
         queue.put(new Stream(-1, null));
         final Future<?> future = executorService.submit(streamBuilder);
         try {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/dda0e333/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
index 2ba90fd..2e5f0d1 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
@@ -36,7 +36,10 @@ package org.apache.kylin.streaming.invertedindex;
 
 import com.google.common.base.Function;
 import com.google.common.base.Stopwatch;
-import com.google.common.collect.*;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.client.HConnectionManager;
 import org.apache.hadoop.hbase.client.HTableInterface;
@@ -44,8 +47,8 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.dict.DictionaryGenerator;
+import org.apache.kylin.invertedindex.index.BatchSliceBuilder;
 import org.apache.kylin.invertedindex.index.Slice;
-import org.apache.kylin.invertedindex.index.SliceBuilder;
 import org.apache.kylin.invertedindex.index.TableRecord;
 import org.apache.kylin.invertedindex.index.TableRecordInfo;
 import org.apache.kylin.invertedindex.model.IIDesc;
@@ -61,7 +64,6 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.TimeUnit;
 
@@ -72,20 +74,20 @@ public class IIStreamBuilder extends StreamBuilder {
 
     private static Logger logger = LoggerFactory.getLogger(IIStreamBuilder.class);
 
-    private IIDesc desc = null;
-    private HTableInterface hTable = null;
-    private int partitionId = -1;
+    private final IIDesc desc;
+    private final HTableInterface hTable;
+    private final BatchSliceBuilder sliceBuilder;
 
     public IIStreamBuilder(LinkedBlockingDeque<Stream> queue, String hTableName, IIDesc desc, int partitionId) {
         super(queue, desc.getSliceSize());
         this.desc = desc;
-        this.partitionId = partitionId;
         try {
             this.hTable = HConnectionManager.createConnection(HBaseConfiguration.create()).getTable(hTableName);
         } catch (IOException e) {
             logger.error("cannot open htable name:" + hTableName, e);
             throw new RuntimeException("cannot open htable name:" + hTableName, e);
         }
+        sliceBuilder = new BatchSliceBuilder(desc, (short) partitionId);
     }
 
     @Override
@@ -101,8 +103,8 @@ public class IIStreamBuilder extends StreamBuilder {
         });
         final Map<Integer, Dictionary<?>> dictionaryMap = buildDictionary(table, desc);
         TableRecordInfo tableRecordInfo = new TableRecordInfo(desc, dictionaryMap);
-        SliceBuilder sliceBuilder = new SliceBuilder(tableRecordInfo, (short) partitionId);
         final Slice slice = buildSlice(table, sliceBuilder, tableRecordInfo, dictionaryMap);
+        logger.info("slice info, shard:" + slice.getShard() + " timestamp:" + slice.getTimestamp() + " record count:" + slice.getRecordCount());
         loadToHBase(hTable, slice, new IIKeyValueCodec(tableRecordInfo.getDigest()));
         submitOffset();
         stopwatch.stop();
@@ -137,15 +139,18 @@ public class IIStreamBuilder extends StreamBuilder {
         return Lists.newArrayList(new String(stream.getRawData()).split(","));
     }
 
-    private Slice buildSlice(List<List<String>> table, SliceBuilder sliceBuilder, TableRecordInfo tableRecordInfo, Map<Integer, Dictionary<?>> localDictionary) {
-        for (List<String> row : table) {
-            TableRecord tableRecord = tableRecordInfo.createTableRecord();
-            for (int i = 0; i < row.size(); i++) {
-                tableRecord.setValueString(i, row.get(i));
+    private Slice buildSlice(List<List<String>> table, BatchSliceBuilder sliceBuilder, final TableRecordInfo tableRecordInfo, Map<Integer, Dictionary<?>> localDictionary) {
+        final Slice slice = sliceBuilder.build(tableRecordInfo.getDigest(), Lists.transform(table, new Function<List<String>, TableRecord>() {
+            @Nullable
+            @Override
+            public TableRecord apply(@Nullable List<String> input) {
+                TableRecord result = tableRecordInfo.createTableRecord();
+                for (int i = 0; i < input.size(); i++) {
+                    result.setValueString(i, input.get(i));
+                }
+                return result;
             }
-            sliceBuilder.append(tableRecord);
-        }
-        final Slice slice = sliceBuilder.close();
+        }));
         slice.setLocalDictionaries(localDictionary);
         return slice;
     }