You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/03/14 01:00:44 UTC
[09/50] [abbrv] incubator-kylin git commit: refactor
refactor
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/1142b686
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/1142b686
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/1142b686
Branch: refs/heads/streaming-localdict
Commit: 1142b6865e948d7a72feb0344935d82c77a0535d
Parents: 132b0ca
Author: qianhao.zhou <qi...@ebay.com>
Authored: Tue Mar 10 17:33:02 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Tue Mar 10 17:33:02 2015 +0800
----------------------------------------------------------------------
.../invertedindex/model/IIKeyValueCodec.java | 31 ++++------
.../kylin/invertedindex/model/KeyValuePair.java | 65 ++++++++++++++++++++
.../invertedindex/InvertedIndexReducer.java | 10 +--
.../apache/kylin/streaming/StreamBuilder.java | 2 +-
.../invertedindex/IIStreamBuilder.java | 4 +-
5 files changed, 87 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1142b686/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
index 8bb6e88..070b672 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
@@ -46,48 +46,43 @@ public class IIKeyValueCodec {
this.infoDigest = digest;
}
- public Collection<Pair<ImmutableBytesWritable, ImmutableBytesWritable>> encodeKeyValue(
- Slice slice) {
- ArrayList<Pair<ImmutableBytesWritable, ImmutableBytesWritable>> result = Lists
+ public Collection<KeyValuePair> encodeKeyValue(Slice slice) {
+ ArrayList<KeyValuePair> result = Lists
.newArrayList();
ColumnValueContainer[] containers = slice.getColumnValueContainers();
for (int col = 0; col < containers.length; col++) {
if (containers[col] instanceof BitMapContainer) {
- collectKeyValues(slice, col, (BitMapContainer) containers[col],
- result);
+ result.addAll(collectKeyValues(slice, col, (BitMapContainer) containers[col]));
} else if (containers[col] instanceof CompressedValueContainer) {
- collectKeyValues(slice, col,
- (CompressedValueContainer) containers[col], result);
+ result.add(collectKeyValues(slice, col, (CompressedValueContainer) containers[col]));
} else {
- throw new IllegalArgumentException("Unkown container class "
+ throw new IllegalArgumentException("Unknown container class "
+ containers[col].getClass());
}
}
return result;
}
- private void collectKeyValues(Slice slice,
+ private KeyValuePair collectKeyValues(Slice slice,
int col,
- CompressedValueContainer container, //
- ArrayList<Pair<ImmutableBytesWritable, ImmutableBytesWritable>> result) {
+ CompressedValueContainer container) {
ImmutableBytesWritable key = encodeKey(slice.getShard(),
slice.getTimestamp(), col, -1);
ImmutableBytesWritable value = container.toBytes();
- result.add(new Pair<ImmutableBytesWritable, ImmutableBytesWritable>(
- key, value));
+ return new KeyValuePair(col, key, value);
}
- private void collectKeyValues(Slice slice,
+ private List<KeyValuePair> collectKeyValues(Slice slice,
int col,
- BitMapContainer container, //
- ArrayList<Pair<ImmutableBytesWritable, ImmutableBytesWritable>> result) {
+ BitMapContainer container) {
List<ImmutableBytesWritable> values = container.toBytes();
+ ArrayList<KeyValuePair> list = Lists.newArrayListWithExpectedSize(values.size());
for (int v = 0; v < values.size(); v++) {
ImmutableBytesWritable key = encodeKey(slice.getShard(),
slice.getTimestamp(), col, v);
- result.add(new Pair<ImmutableBytesWritable, ImmutableBytesWritable>(
- key, values.get(v)));
+ list.add(new KeyValuePair(col, key, values.get(v)));
}
+ return list;
}
ImmutableBytesWritable encodeKey(short shard, long timestamp, int col,
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1142b686/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/KeyValuePair.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/KeyValuePair.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/KeyValuePair.java
new file mode 100644
index 0000000..445389f
--- /dev/null
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/KeyValuePair.java
@@ -0,0 +1,65 @@
+/*
+ *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ * contributor license agreements. See the NOTICE file distributed with
+ *
+ * this work for additional information regarding copyright ownership.
+ *
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ * (the "License"); you may not use this file except in compliance with
+ *
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ *
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ * See the License for the specific language governing permissions and
+ *
+ * limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.invertedindex.model;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+/**
+ * Created by qianzhou on 3/10/15.
+ */
+public final class KeyValuePair {
+
+ private int columnIndex;
+ private ImmutableBytesWritable key;
+ private ImmutableBytesWritable value;
+
+ public KeyValuePair(int columnIndex, ImmutableBytesWritable key, ImmutableBytesWritable value) {
+ this.columnIndex = columnIndex;
+ this.key = key;
+ this.value = value;
+ }
+
+ public int getColumnIndex() {
+ return columnIndex;
+ }
+
+ public ImmutableBytesWritable getKey() {
+ return key;
+ }
+
+ public ImmutableBytesWritable getValue() {
+ return value;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1142b686/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexReducer.java
index f65db0e..40cca36 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexReducer.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexReducer.java
@@ -18,11 +18,8 @@
package org.apache.kylin.job.hadoop.invertedindex;
-import java.io.IOException;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.LongWritable;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.mr.KylinReducer;
@@ -34,10 +31,13 @@ import org.apache.kylin.invertedindex.index.SliceBuilder;
import org.apache.kylin.invertedindex.index.TableRecord;
import org.apache.kylin.invertedindex.index.TableRecordInfo;
import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
+import org.apache.kylin.invertedindex.model.KeyValuePair;
import org.apache.kylin.job.constant.BatchConstants;
import org.apache.kylin.job.hadoop.AbstractHadoopJob;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import java.io.IOException;
+
/**
* @author yangli9
*/
@@ -92,8 +92,8 @@ public class InvertedIndexReducer extends KylinReducer<LongWritable, ImmutableBy
}
private void output(Slice slice, Context context) throws IOException, InterruptedException {
- for (Pair<ImmutableBytesWritable, ImmutableBytesWritable> pair : kv.encodeKeyValue(slice)) {
- context.write(pair.getFirst(), pair.getSecond());
+ for (KeyValuePair pair : kv.encodeKeyValue(slice)) {
+ context.write(pair.getKey(), pair.getValue());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1142b686/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
index d86da90..a86a9b1 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
@@ -72,7 +72,7 @@ public abstract class StreamBuilder implements Runnable {
List<Stream> streamToBuild = Lists.newArrayList();
clearCounter();
while (true) {
- final Stream stream = streamQueue.poll(200, TimeUnit.MILLISECONDS);
+ final Stream stream = streamQueue.poll(50, TimeUnit.MILLISECONDS);
if (stream == null) {
continue;
} else {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1142b686/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
index 8b2673d..6339fe4 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
@@ -168,7 +168,9 @@ public class IIStreamBuilder extends StreamBuilder {
final byte[] key = pair.getFirst().get();
final byte[] value = pair.getSecond().get();
Put put = new Put(key);
- put.add("cf".getBytes(), "qn".getBytes(), value);
+ put.add(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_QUALIFIER_BYTES, value);
+ //dictionary
+// put.add(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_QUALIFIER_BYTES, value);
data.add(put);
}
hTable.put(data);