You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/11/24 09:43:13 UTC

incubator-kylin git commit: KYLIN-980 FactDistinctColumnsJob support high cardinality columns

Repository: incubator-kylin
Updated Branches:
  refs/heads/2.x-staging effd9a821 -> c3880be69


KYLIN-980 FactDistinctColumnsJob support high cardinality columns


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/c3880be6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/c3880be6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/c3880be6

Branch: refs/heads/2.x-staging
Commit: c3880be697ce6038604e4dfa5cd950710b5e811e
Parents: effd9a8
Author: shaofengshi <sh...@apache.org>
Authored: Tue Nov 24 16:42:35 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Tue Nov 24 16:43:06 2015 +0800

----------------------------------------------------------------------
 .../mr/steps/FactDistinctColumnsReducer.java    | 55 +++++++++++++-------
 1 file changed, 36 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c3880be6/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
index 568dd77..66d63fb 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
@@ -20,11 +20,7 @@ package org.apache.kylin.engine.mr.steps;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -96,24 +92,23 @@ public class FactDistinctColumnsReducer extends KylinReducer<LongWritable, Text,
             TblColRef col = columnList.get((int) key.get());
 
             HashSet<ByteArray> set = new HashSet<ByteArray>();
-            for (Text textValue : values) {
+            
+            Text textValue = null;
+            Iterator<Text> valueItr = values.iterator();
+            while (valueItr.hasNext()) {
+                textValue = valueItr.next();
                 ByteArray value = new ByteArray(Bytes.copy(textValue.getBytes(), 0, textValue.getLength()));
                 set.add(value);
-            }
-
-            Configuration conf = context.getConfiguration();
-            FileSystem fs = FileSystem.get(conf);
-            String outputPath = conf.get(BatchConstants.OUTPUT_PATH);
-            FSDataOutputStream out = fs.create(new Path(outputPath, col.getName()));
-
-            try {
-                for (ByteArray value : set) {
-                    out.write(value.array(), value.offset(), value.length());
-                    out.write('\n');
+                if (set.size() >= 5000000) { // output when count reach 5 Million 
+                    outputDistinctValues(col, set, context);
+                    set.clear();
                 }
-            } finally {
-                out.close();
             }
+            
+            if (set.isEmpty() == false) {
+                outputDistinctValues(col, set, context);
+            }
+            
         } else {
             // for hll
             long cuboidId = 0 - key.get();
@@ -138,6 +133,28 @@ public class FactDistinctColumnsReducer extends KylinReducer<LongWritable, Text,
         }
 
     }
+    
+    private void outputDistinctValues(TblColRef col, Set<ByteArray> set, Context context) throws IOException {
+        final Configuration conf = context.getConfiguration();
+        final FileSystem fs = FileSystem.get(conf);
+        final String outputPath = conf.get(BatchConstants.OUTPUT_PATH);
+        final Path outputFile = new Path(outputPath, col.getName());
+        FSDataOutputStream out;
+        if (fs.exists(outputFile)) {
+            out = fs.append(outputFile);
+        } else {
+            out = fs.create(outputFile);
+        }
+
+        try {
+            for (ByteArray value : set) {
+                out.write(value.array(), value.offset(), value.length());
+                out.write('\n');
+            }
+        } finally {
+            out.close();
+        }
+    }
 
     @Override
     protected void cleanup(Context context) throws IOException, InterruptedException {