You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/03/14 01:00:44 UTC

[09/50] [abbrv] incubator-kylin git commit: refactor

refactor


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/1142b686
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/1142b686
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/1142b686

Branch: refs/heads/streaming-localdict
Commit: 1142b6865e948d7a72feb0344935d82c77a0535d
Parents: 132b0ca
Author: qianhao.zhou <qi...@ebay.com>
Authored: Tue Mar 10 17:33:02 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Tue Mar 10 17:33:02 2015 +0800

----------------------------------------------------------------------
 .../invertedindex/model/IIKeyValueCodec.java    | 31 ++++------
 .../kylin/invertedindex/model/KeyValuePair.java | 65 ++++++++++++++++++++
 .../invertedindex/InvertedIndexReducer.java     | 10 +--
 .../apache/kylin/streaming/StreamBuilder.java   |  2 +-
 .../invertedindex/IIStreamBuilder.java          |  4 +-
 5 files changed, 87 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1142b686/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
index 8bb6e88..070b672 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
@@ -46,48 +46,43 @@ public class IIKeyValueCodec {
 		this.infoDigest = digest;
 	}
 
-	public Collection<Pair<ImmutableBytesWritable, ImmutableBytesWritable>> encodeKeyValue(
-			Slice slice) {
-		ArrayList<Pair<ImmutableBytesWritable, ImmutableBytesWritable>> result = Lists
+	public Collection<KeyValuePair> encodeKeyValue(Slice slice) {
+		ArrayList<KeyValuePair> result = Lists
 				.newArrayList();
 		ColumnValueContainer[] containers = slice.getColumnValueContainers();
 		for (int col = 0; col < containers.length; col++) {
 			if (containers[col] instanceof BitMapContainer) {
-				collectKeyValues(slice, col, (BitMapContainer) containers[col],
-						result);
+				result.addAll(collectKeyValues(slice, col, (BitMapContainer) containers[col]));
 			} else if (containers[col] instanceof CompressedValueContainer) {
-				collectKeyValues(slice, col,
-						(CompressedValueContainer) containers[col], result);
+				result.add(collectKeyValues(slice, col, (CompressedValueContainer) containers[col]));
 			} else {
-				throw new IllegalArgumentException("Unkown container class "
+				throw new IllegalArgumentException("Unknown container class "
 						+ containers[col].getClass());
 			}
 		}
 		return result;
 	}
 
-	private void collectKeyValues(Slice slice,
+	private KeyValuePair collectKeyValues(Slice slice,
 			int col,
-			CompressedValueContainer container, //
-			ArrayList<Pair<ImmutableBytesWritable, ImmutableBytesWritable>> result) {
+			CompressedValueContainer container) {
 		ImmutableBytesWritable key = encodeKey(slice.getShard(),
 				slice.getTimestamp(), col, -1);
 		ImmutableBytesWritable value = container.toBytes();
-		result.add(new Pair<ImmutableBytesWritable, ImmutableBytesWritable>(
-				key, value));
+        return new KeyValuePair(col, key, value);
 	}
 
-	private void collectKeyValues(Slice slice,
+	private List<KeyValuePair> collectKeyValues(Slice slice,
 			int col,
-			BitMapContainer container, //
-			ArrayList<Pair<ImmutableBytesWritable, ImmutableBytesWritable>> result) {
+			BitMapContainer container) {
 		List<ImmutableBytesWritable> values = container.toBytes();
+        ArrayList<KeyValuePair> list = Lists.newArrayListWithExpectedSize(values.size());
 		for (int v = 0; v < values.size(); v++) {
 			ImmutableBytesWritable key = encodeKey(slice.getShard(),
 					slice.getTimestamp(), col, v);
-			result.add(new Pair<ImmutableBytesWritable, ImmutableBytesWritable>(
-					key, values.get(v)));
+            list.add(new KeyValuePair(col, key, values.get(v)));
 		}
+        return list;
 	}
 
 	ImmutableBytesWritable encodeKey(short shard, long timestamp, int col,

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1142b686/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/KeyValuePair.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/KeyValuePair.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/KeyValuePair.java
new file mode 100644
index 0000000..445389f
--- /dev/null
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/KeyValuePair.java
@@ -0,0 +1,65 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.invertedindex.model;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+/**
+ * Created by qianzhou on 3/10/15.
+ */
+public final class KeyValuePair {
+
+    private int columnIndex;
+    private ImmutableBytesWritable key;
+    private ImmutableBytesWritable value;
+
+    public KeyValuePair(int columnIndex, ImmutableBytesWritable key, ImmutableBytesWritable value) {
+        this.columnIndex = columnIndex;
+        this.key = key;
+        this.value = value;
+    }
+
+    public int getColumnIndex() {
+        return columnIndex;
+    }
+
+    public ImmutableBytesWritable getKey() {
+        return key;
+    }
+
+    public ImmutableBytesWritable getValue() {
+        return value;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1142b686/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexReducer.java
index f65db0e..40cca36 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexReducer.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexReducer.java
@@ -18,11 +18,8 @@
 
 package org.apache.kylin.job.hadoop.invertedindex;
 
-import java.io.IOException;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.mr.KylinReducer;
@@ -34,10 +31,13 @@ import org.apache.kylin.invertedindex.index.SliceBuilder;
 import org.apache.kylin.invertedindex.index.TableRecord;
 import org.apache.kylin.invertedindex.index.TableRecordInfo;
 import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
+import org.apache.kylin.invertedindex.model.KeyValuePair;
 import org.apache.kylin.job.constant.BatchConstants;
 import org.apache.kylin.job.hadoop.AbstractHadoopJob;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 
+import java.io.IOException;
+
 /**
  * @author yangli9
  */
@@ -92,8 +92,8 @@ public class InvertedIndexReducer extends KylinReducer<LongWritable, ImmutableBy
     }
 
     private void output(Slice slice, Context context) throws IOException, InterruptedException {
-        for (Pair<ImmutableBytesWritable, ImmutableBytesWritable> pair : kv.encodeKeyValue(slice)) {
-            context.write(pair.getFirst(), pair.getSecond());
+        for (KeyValuePair pair : kv.encodeKeyValue(slice)) {
+            context.write(pair.getKey(), pair.getValue());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1142b686/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
index d86da90..a86a9b1 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
@@ -72,7 +72,7 @@ public abstract class StreamBuilder implements Runnable {
             List<Stream> streamToBuild = Lists.newArrayList();
             clearCounter();
             while (true) {
-                final Stream stream = streamQueue.poll(200, TimeUnit.MILLISECONDS);
+                final Stream stream = streamQueue.poll(50, TimeUnit.MILLISECONDS);
                 if (stream == null) {
                     continue;
                 } else {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1142b686/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
index 8b2673d..6339fe4 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
@@ -168,7 +168,9 @@ public class IIStreamBuilder extends StreamBuilder {
                 final byte[] key = pair.getFirst().get();
                 final byte[] value = pair.getSecond().get();
                 Put put = new Put(key);
-                put.add("cf".getBytes(), "qn".getBytes(), value);
+                put.add(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_QUALIFIER_BYTES, value);
+                //dictionary
+//                put.add(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_QUALIFIER_BYTES, value);
                 data.add(put);
             }
             hTable.put(data);