You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/12/14 07:30:49 UTC

[3/5] kylin git commit: KYLIN-1832 code review

http://git-wip-us.apache.org/repos/asf/kylin/blob/e6e330a8/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
index 76212c8..6e894dd 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
@@ -83,7 +83,7 @@ import org.apache.kylin.engine.spark.cube.DefaultTupleConverter;
 import org.apache.kylin.engine.spark.util.IteratorUtils;
 import org.apache.kylin.measure.BufferedMeasureCodec;
 import org.apache.kylin.measure.MeasureAggregators;
-import org.apache.kylin.measure.hllc.HyperLogLogPlusCounterNew;
+import org.apache.kylin.measure.hllc.HLLCounter;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
@@ -241,15 +241,15 @@ public class SparkCubing extends AbstractApplication {
         }
     }
 
-    private Map<Long, HyperLogLogPlusCounterNew> sampling(final JavaRDD<List<String>> rowJavaRDD, final String cubeName, String segmentId) throws Exception {
+    private Map<Long, HLLCounter> sampling(final JavaRDD<List<String>> rowJavaRDD, final String cubeName, String segmentId) throws Exception {
         CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).reloadCubeLocal(cubeName);
         CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
         CubeDesc cubeDesc = cubeInstance.getDescriptor();
         CuboidScheduler cuboidScheduler = new CuboidScheduler(cubeDesc);
         List<Long> allCuboidIds = cuboidScheduler.getAllCuboidIds();
-        final HashMap<Long, HyperLogLogPlusCounterNew> zeroValue = Maps.newHashMap();
+        final HashMap<Long, HLLCounter> zeroValue = Maps.newHashMap();
         for (Long id : allCuboidIds) {
-            zeroValue.put(id, new HyperLogLogPlusCounterNew(cubeDesc.getConfig().getCubeStatsHLLPrecision()));
+            zeroValue.put(id, new HLLCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision()));
         }
 
         CubeJoinedFlatTableEnrich flatDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSegment), cubeDesc);
@@ -278,12 +278,12 @@ public class SparkCubing extends AbstractApplication {
             row_hashcodes[i] = new ByteArray();
         }
 
-        final HashMap<Long, HyperLogLogPlusCounterNew> samplingResult = rowJavaRDD.aggregate(zeroValue, new Function2<HashMap<Long, HyperLogLogPlusCounterNew>, List<String>, HashMap<Long, HyperLogLogPlusCounterNew>>() {
+        final HashMap<Long, HLLCounter> samplingResult = rowJavaRDD.aggregate(zeroValue, new Function2<HashMap<Long, HLLCounter>, List<String>, HashMap<Long, HLLCounter>>() {
 
             final HashFunction hashFunction = Hashing.murmur3_128();
 
             @Override
-            public HashMap<Long, HyperLogLogPlusCounterNew> call(HashMap<Long, HyperLogLogPlusCounterNew> v1, List<String> v2) throws Exception {
+            public HashMap<Long, HLLCounter> call(HashMap<Long, HLLCounter> v1, List<String> v2) throws Exception {
                 for (int i = 0; i < nRowKey; i++) {
                     Hasher hc = hashFunction.newHasher();
                     String colValue = v2.get(rowKeyColumnIndexes[i]);
@@ -296,7 +296,7 @@ public class SparkCubing extends AbstractApplication {
 
                 for (Map.Entry<Long, Integer[]> entry : allCuboidsBitSet.entrySet()) {
                     Hasher hc = hashFunction.newHasher();
-                    HyperLogLogPlusCounterNew counter = v1.get(entry.getKey());
+                    HLLCounter counter = v1.get(entry.getKey());
                     final Integer[] cuboidBitSet = entry.getValue();
                     for (int position = 0; position < cuboidBitSet.length; position++) {
                         hc.putBytes(row_hashcodes[cuboidBitSet[position]].array());
@@ -305,14 +305,14 @@ public class SparkCubing extends AbstractApplication {
                 }
                 return v1;
             }
-        }, new Function2<HashMap<Long, HyperLogLogPlusCounterNew>, HashMap<Long, HyperLogLogPlusCounterNew>, HashMap<Long, HyperLogLogPlusCounterNew>>() {
+        }, new Function2<HashMap<Long, HLLCounter>, HashMap<Long, HLLCounter>, HashMap<Long, HLLCounter>>() {
             @Override
-            public HashMap<Long, HyperLogLogPlusCounterNew> call(HashMap<Long, HyperLogLogPlusCounterNew> v1, HashMap<Long, HyperLogLogPlusCounterNew> v2) throws Exception {
+            public HashMap<Long, HLLCounter> call(HashMap<Long, HLLCounter> v1, HashMap<Long, HLLCounter> v2) throws Exception {
                 Preconditions.checkArgument(v1.size() == v2.size());
                 Preconditions.checkArgument(v1.size() > 0);
-                for (Map.Entry<Long, HyperLogLogPlusCounterNew> entry : v1.entrySet()) {
-                    final HyperLogLogPlusCounterNew counter1 = entry.getValue();
-                    final HyperLogLogPlusCounterNew counter2 = v2.get(entry.getKey());
+                for (Map.Entry<Long, HLLCounter> entry : v1.entrySet()) {
+                    final HLLCounter counter1 = entry.getValue();
+                    final HLLCounter counter2 = v2.get(entry.getKey());
                     counter1.merge(Preconditions.checkNotNull(counter2, "counter cannot be null"));
                 }
                 return v1;
@@ -470,7 +470,7 @@ public class SparkCubing extends AbstractApplication {
         ClassUtil.addClasspath(confPath);
     }
 
-    private byte[][] createHTable(String cubeName, String segmentId, Map<Long, HyperLogLogPlusCounterNew> samplingResult) throws Exception {
+    private byte[][] createHTable(String cubeName, String segmentId, Map<Long, HLLCounter> samplingResult) throws Exception {
         final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
         final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName);
         final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
@@ -614,7 +614,7 @@ public class SparkCubing extends AbstractApplication {
             }
         });
 
-        final Map<Long, HyperLogLogPlusCounterNew> samplingResult = sampling(rowJavaRDD, cubeName, segmentId);
+        final Map<Long, HLLCounter> samplingResult = sampling(rowJavaRDD, cubeName, segmentId);
         final byte[][] splitKeys = createHTable(cubeName, segmentId, samplingResult);
 
         final String hfile = build(rowJavaRDD, cubeName, segmentId, splitKeys);

http://git-wip-us.apache.org/repos/asf/kylin/blob/e6e330a8/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java
index 230249f..f046f78 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java
@@ -35,7 +35,7 @@ import org.apache.kylin.engine.mr.MRUtil;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.measure.BufferedMeasureCodec;
-import org.apache.kylin.measure.hllc.HyperLogLogPlusCounterNew;
+import org.apache.kylin.measure.hllc.HLLCounter;
 import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.TableDesc;
@@ -46,7 +46,7 @@ import org.apache.kylin.metadata.model.TableDesc;
  */
 public class ColumnCardinalityMapper<T> extends KylinMapper<T, Object, IntWritable, BytesWritable> {
 
-    private Map<Integer, HyperLogLogPlusCounterNew> hllcMap = new HashMap<Integer, HyperLogLogPlusCounterNew>();
+    private Map<Integer, HLLCounter> hllcMap = new HashMap<Integer, HLLCounter>();
     public static final String DEFAULT_DELIM = ",";
 
     private int counter = 0;
@@ -87,9 +87,9 @@ public class ColumnCardinalityMapper<T> extends KylinMapper<T, Object, IntWritab
         counter++;
     }
 
-    private HyperLogLogPlusCounterNew getHllc(Integer key) {
+    private HLLCounter getHllc(Integer key) {
         if (!hllcMap.containsKey(key)) {
-            hllcMap.put(key, new HyperLogLogPlusCounterNew());
+            hllcMap.put(key, new HLLCounter());
         }
         return hllcMap.get(key);
     }
@@ -100,7 +100,7 @@ public class ColumnCardinalityMapper<T> extends KylinMapper<T, Object, IntWritab
         ByteBuffer buf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
         while (it.hasNext()) {
             int key = it.next();
-            HyperLogLogPlusCounterNew hllc = hllcMap.get(key);
+            HLLCounter hllc = hllcMap.get(key);
             buf.clear();
             hllc.writeRegisters(buf);
             buf.flip();

http://git-wip-us.apache.org/repos/asf/kylin/blob/e6e330a8/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java
index 32cc6d9..0648960 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java
@@ -32,7 +32,7 @@ import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.kylin.engine.mr.KylinReducer;
 import org.apache.kylin.measure.BufferedMeasureCodec;
-import org.apache.kylin.measure.hllc.HyperLogLogPlusCounterNew;
+import org.apache.kylin.measure.hllc.HLLCounter;
 
 /**
  * @author Jack
@@ -41,7 +41,7 @@ import org.apache.kylin.measure.hllc.HyperLogLogPlusCounterNew;
 public class ColumnCardinalityReducer extends KylinReducer<IntWritable, BytesWritable, IntWritable, LongWritable> {
 
     public static final int ONE = 1;
-    private Map<Integer, HyperLogLogPlusCounterNew> hllcMap = new HashMap<Integer, HyperLogLogPlusCounterNew>();
+    private Map<Integer, HLLCounter> hllcMap = new HashMap<Integer, HLLCounter>();
 
     @Override
     protected void setup(Context context) throws IOException {
@@ -53,16 +53,16 @@ public class ColumnCardinalityReducer extends KylinReducer<IntWritable, BytesWri
         int skey = key.get();
         for (BytesWritable v : values) {
             ByteBuffer buffer = ByteBuffer.wrap(v.getBytes());
-            HyperLogLogPlusCounterNew hll = new HyperLogLogPlusCounterNew();
+            HLLCounter hll = new HLLCounter();
             hll.readRegisters(buffer);
             getHllc(skey).merge(hll);
             hll.clear();
         }
     }
 
-    private HyperLogLogPlusCounterNew getHllc(Integer key) {
+    private HLLCounter getHllc(Integer key) {
         if (!hllcMap.containsKey(key)) {
-            hllcMap.put(key, new HyperLogLogPlusCounterNew());
+            hllcMap.put(key, new HLLCounter());
         }
         return hllcMap.get(key);
     }
@@ -78,7 +78,7 @@ public class ColumnCardinalityReducer extends KylinReducer<IntWritable, BytesWri
         it = keys.iterator();
         while (it.hasNext()) {
             int key = it.next();
-            HyperLogLogPlusCounterNew hllc = hllcMap.get(key);
+            HLLCounter hllc = hllcMap.get(key);
             ByteBuffer buf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
             buf.clear();
             hllc.writeRegisters(buf);

http://git-wip-us.apache.org/repos/asf/kylin/blob/e6e330a8/source-hive/src/test/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducerTest.java
----------------------------------------------------------------------
diff --git a/source-hive/src/test/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducerTest.java b/source-hive/src/test/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducerTest.java
index 410543a..c32e76d 100644
--- a/source-hive/src/test/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducerTest.java
+++ b/source-hive/src/test/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducerTest.java
@@ -35,7 +35,7 @@ import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
 import org.apache.hadoop.mrunit.types.Pair;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.measure.BufferedMeasureCodec;
-import org.apache.kylin.measure.hllc.HyperLogLogPlusCounterNew;
+import org.apache.kylin.measure.hllc.HLLCounter;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -57,7 +57,7 @@ public class ColumnCardinalityReducerTest {
     }
 
     private byte[] getBytes(String str) throws IOException {
-        HyperLogLogPlusCounterNew hllc = new HyperLogLogPlusCounterNew();
+        HLLCounter hllc = new HLLCounter();
         StringTokenizer tokenizer = new StringTokenizer(str, ColumnCardinalityMapper.DEFAULT_DELIM);
         int i = 0;
         while (tokenizer.hasMoreTokens()) {