You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/11/24 09:43:13 UTC
incubator-kylin git commit: KYLIN-980 FactDistinctColumnsJob support
high cardinality columns
Repository: incubator-kylin
Updated Branches:
refs/heads/2.x-staging effd9a821 -> c3880be69
KYLIN-980 FactDistinctColumnsJob support high cardinality columns
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/c3880be6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/c3880be6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/c3880be6
Branch: refs/heads/2.x-staging
Commit: c3880be697ce6038604e4dfa5cd950710b5e811e
Parents: effd9a8
Author: shaofengshi <sh...@apache.org>
Authored: Tue Nov 24 16:42:35 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Tue Nov 24 16:43:06 2015 +0800
----------------------------------------------------------------------
.../mr/steps/FactDistinctColumnsReducer.java | 55 +++++++++++++-------
1 file changed, 36 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c3880be6/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
index 568dd77..66d63fb 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
@@ -20,11 +20,7 @@ package org.apache.kylin.engine.mr.steps;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -96,24 +92,23 @@ public class FactDistinctColumnsReducer extends KylinReducer<LongWritable, Text,
TblColRef col = columnList.get((int) key.get());
HashSet<ByteArray> set = new HashSet<ByteArray>();
- for (Text textValue : values) {
+
+ Text textValue = null;
+ Iterator<Text> valueItr = values.iterator();
+ while (valueItr.hasNext()) {
+ textValue = valueItr.next();
ByteArray value = new ByteArray(Bytes.copy(textValue.getBytes(), 0, textValue.getLength()));
set.add(value);
- }
-
- Configuration conf = context.getConfiguration();
- FileSystem fs = FileSystem.get(conf);
- String outputPath = conf.get(BatchConstants.OUTPUT_PATH);
- FSDataOutputStream out = fs.create(new Path(outputPath, col.getName()));
-
- try {
- for (ByteArray value : set) {
- out.write(value.array(), value.offset(), value.length());
- out.write('\n');
+ if (set.size() >= 5000000) { // output when count reach 5 Million
+ outputDistinctValues(col, set, context);
+ set.clear();
}
- } finally {
- out.close();
}
+
+ if (set.isEmpty() == false) {
+ outputDistinctValues(col, set, context);
+ }
+
} else {
// for hll
long cuboidId = 0 - key.get();
@@ -138,6 +133,28 @@ public class FactDistinctColumnsReducer extends KylinReducer<LongWritable, Text,
}
}
+
+ private void outputDistinctValues(TblColRef col, Set<ByteArray> set, Context context) throws IOException {
+ final Configuration conf = context.getConfiguration();
+ final FileSystem fs = FileSystem.get(conf);
+ final String outputPath = conf.get(BatchConstants.OUTPUT_PATH);
+ final Path outputFile = new Path(outputPath, col.getName());
+ FSDataOutputStream out;
+ if (fs.exists(outputFile)) {
+ out = fs.append(outputFile);
+ } else {
+ out = fs.create(outputFile);
+ }
+
+ try {
+ for (ByteArray value : set) {
+ out.write(value.array(), value.offset(), value.length());
+ out.write('\n');
+ }
+ } finally {
+ out.close();
+ }
+ }
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {