You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/12/14 07:30:49 UTC
[3/5] kylin git commit: KYLIN-1832 code review
http://git-wip-us.apache.org/repos/asf/kylin/blob/e6e330a8/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
index 76212c8..6e894dd 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
@@ -83,7 +83,7 @@ import org.apache.kylin.engine.spark.cube.DefaultTupleConverter;
import org.apache.kylin.engine.spark.util.IteratorUtils;
import org.apache.kylin.measure.BufferedMeasureCodec;
import org.apache.kylin.measure.MeasureAggregators;
-import org.apache.kylin.measure.hllc.HyperLogLogPlusCounterNew;
+import org.apache.kylin.measure.hllc.HLLCounter;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.apache.kylin.metadata.model.MeasureDesc;
@@ -241,15 +241,15 @@ public class SparkCubing extends AbstractApplication {
}
}
- private Map<Long, HyperLogLogPlusCounterNew> sampling(final JavaRDD<List<String>> rowJavaRDD, final String cubeName, String segmentId) throws Exception {
+ private Map<Long, HLLCounter> sampling(final JavaRDD<List<String>> rowJavaRDD, final String cubeName, String segmentId) throws Exception {
CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).reloadCubeLocal(cubeName);
CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
CubeDesc cubeDesc = cubeInstance.getDescriptor();
CuboidScheduler cuboidScheduler = new CuboidScheduler(cubeDesc);
List<Long> allCuboidIds = cuboidScheduler.getAllCuboidIds();
- final HashMap<Long, HyperLogLogPlusCounterNew> zeroValue = Maps.newHashMap();
+ final HashMap<Long, HLLCounter> zeroValue = Maps.newHashMap();
for (Long id : allCuboidIds) {
- zeroValue.put(id, new HyperLogLogPlusCounterNew(cubeDesc.getConfig().getCubeStatsHLLPrecision()));
+ zeroValue.put(id, new HLLCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision()));
}
CubeJoinedFlatTableEnrich flatDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSegment), cubeDesc);
@@ -278,12 +278,12 @@ public class SparkCubing extends AbstractApplication {
row_hashcodes[i] = new ByteArray();
}
- final HashMap<Long, HyperLogLogPlusCounterNew> samplingResult = rowJavaRDD.aggregate(zeroValue, new Function2<HashMap<Long, HyperLogLogPlusCounterNew>, List<String>, HashMap<Long, HyperLogLogPlusCounterNew>>() {
+ final HashMap<Long, HLLCounter> samplingResult = rowJavaRDD.aggregate(zeroValue, new Function2<HashMap<Long, HLLCounter>, List<String>, HashMap<Long, HLLCounter>>() {
final HashFunction hashFunction = Hashing.murmur3_128();
@Override
- public HashMap<Long, HyperLogLogPlusCounterNew> call(HashMap<Long, HyperLogLogPlusCounterNew> v1, List<String> v2) throws Exception {
+ public HashMap<Long, HLLCounter> call(HashMap<Long, HLLCounter> v1, List<String> v2) throws Exception {
for (int i = 0; i < nRowKey; i++) {
Hasher hc = hashFunction.newHasher();
String colValue = v2.get(rowKeyColumnIndexes[i]);
@@ -296,7 +296,7 @@ public class SparkCubing extends AbstractApplication {
for (Map.Entry<Long, Integer[]> entry : allCuboidsBitSet.entrySet()) {
Hasher hc = hashFunction.newHasher();
- HyperLogLogPlusCounterNew counter = v1.get(entry.getKey());
+ HLLCounter counter = v1.get(entry.getKey());
final Integer[] cuboidBitSet = entry.getValue();
for (int position = 0; position < cuboidBitSet.length; position++) {
hc.putBytes(row_hashcodes[cuboidBitSet[position]].array());
@@ -305,14 +305,14 @@ public class SparkCubing extends AbstractApplication {
}
return v1;
}
- }, new Function2<HashMap<Long, HyperLogLogPlusCounterNew>, HashMap<Long, HyperLogLogPlusCounterNew>, HashMap<Long, HyperLogLogPlusCounterNew>>() {
+ }, new Function2<HashMap<Long, HLLCounter>, HashMap<Long, HLLCounter>, HashMap<Long, HLLCounter>>() {
@Override
- public HashMap<Long, HyperLogLogPlusCounterNew> call(HashMap<Long, HyperLogLogPlusCounterNew> v1, HashMap<Long, HyperLogLogPlusCounterNew> v2) throws Exception {
+ public HashMap<Long, HLLCounter> call(HashMap<Long, HLLCounter> v1, HashMap<Long, HLLCounter> v2) throws Exception {
Preconditions.checkArgument(v1.size() == v2.size());
Preconditions.checkArgument(v1.size() > 0);
- for (Map.Entry<Long, HyperLogLogPlusCounterNew> entry : v1.entrySet()) {
- final HyperLogLogPlusCounterNew counter1 = entry.getValue();
- final HyperLogLogPlusCounterNew counter2 = v2.get(entry.getKey());
+ for (Map.Entry<Long, HLLCounter> entry : v1.entrySet()) {
+ final HLLCounter counter1 = entry.getValue();
+ final HLLCounter counter2 = v2.get(entry.getKey());
counter1.merge(Preconditions.checkNotNull(counter2, "counter cannot be null"));
}
return v1;
@@ -470,7 +470,7 @@ public class SparkCubing extends AbstractApplication {
ClassUtil.addClasspath(confPath);
}
- private byte[][] createHTable(String cubeName, String segmentId, Map<Long, HyperLogLogPlusCounterNew> samplingResult) throws Exception {
+ private byte[][] createHTable(String cubeName, String segmentId, Map<Long, HLLCounter> samplingResult) throws Exception {
final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName);
final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
@@ -614,7 +614,7 @@ public class SparkCubing extends AbstractApplication {
}
});
- final Map<Long, HyperLogLogPlusCounterNew> samplingResult = sampling(rowJavaRDD, cubeName, segmentId);
+ final Map<Long, HLLCounter> samplingResult = sampling(rowJavaRDD, cubeName, segmentId);
final byte[][] splitKeys = createHTable(cubeName, segmentId, samplingResult);
final String hfile = build(rowJavaRDD, cubeName, segmentId, splitKeys);
http://git-wip-us.apache.org/repos/asf/kylin/blob/e6e330a8/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java
index 230249f..f046f78 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java
@@ -35,7 +35,7 @@ import org.apache.kylin.engine.mr.MRUtil;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.measure.BufferedMeasureCodec;
-import org.apache.kylin.measure.hllc.HyperLogLogPlusCounterNew;
+import org.apache.kylin.measure.hllc.HLLCounter;
import org.apache.kylin.metadata.MetadataManager;
import org.apache.kylin.metadata.model.ColumnDesc;
import org.apache.kylin.metadata.model.TableDesc;
@@ -46,7 +46,7 @@ import org.apache.kylin.metadata.model.TableDesc;
*/
public class ColumnCardinalityMapper<T> extends KylinMapper<T, Object, IntWritable, BytesWritable> {
- private Map<Integer, HyperLogLogPlusCounterNew> hllcMap = new HashMap<Integer, HyperLogLogPlusCounterNew>();
+ private Map<Integer, HLLCounter> hllcMap = new HashMap<Integer, HLLCounter>();
public static final String DEFAULT_DELIM = ",";
private int counter = 0;
@@ -87,9 +87,9 @@ public class ColumnCardinalityMapper<T> extends KylinMapper<T, Object, IntWritab
counter++;
}
- private HyperLogLogPlusCounterNew getHllc(Integer key) {
+ private HLLCounter getHllc(Integer key) {
if (!hllcMap.containsKey(key)) {
- hllcMap.put(key, new HyperLogLogPlusCounterNew());
+ hllcMap.put(key, new HLLCounter());
}
return hllcMap.get(key);
}
@@ -100,7 +100,7 @@ public class ColumnCardinalityMapper<T> extends KylinMapper<T, Object, IntWritab
ByteBuffer buf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
while (it.hasNext()) {
int key = it.next();
- HyperLogLogPlusCounterNew hllc = hllcMap.get(key);
+ HLLCounter hllc = hllcMap.get(key);
buf.clear();
hllc.writeRegisters(buf);
buf.flip();
http://git-wip-us.apache.org/repos/asf/kylin/blob/e6e330a8/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java
index 32cc6d9..0648960 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java
@@ -32,7 +32,7 @@ import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.kylin.engine.mr.KylinReducer;
import org.apache.kylin.measure.BufferedMeasureCodec;
-import org.apache.kylin.measure.hllc.HyperLogLogPlusCounterNew;
+import org.apache.kylin.measure.hllc.HLLCounter;
/**
* @author Jack
@@ -41,7 +41,7 @@ import org.apache.kylin.measure.hllc.HyperLogLogPlusCounterNew;
public class ColumnCardinalityReducer extends KylinReducer<IntWritable, BytesWritable, IntWritable, LongWritable> {
public static final int ONE = 1;
- private Map<Integer, HyperLogLogPlusCounterNew> hllcMap = new HashMap<Integer, HyperLogLogPlusCounterNew>();
+ private Map<Integer, HLLCounter> hllcMap = new HashMap<Integer, HLLCounter>();
@Override
protected void setup(Context context) throws IOException {
@@ -53,16 +53,16 @@ public class ColumnCardinalityReducer extends KylinReducer<IntWritable, BytesWri
int skey = key.get();
for (BytesWritable v : values) {
ByteBuffer buffer = ByteBuffer.wrap(v.getBytes());
- HyperLogLogPlusCounterNew hll = new HyperLogLogPlusCounterNew();
+ HLLCounter hll = new HLLCounter();
hll.readRegisters(buffer);
getHllc(skey).merge(hll);
hll.clear();
}
}
- private HyperLogLogPlusCounterNew getHllc(Integer key) {
+ private HLLCounter getHllc(Integer key) {
if (!hllcMap.containsKey(key)) {
- hllcMap.put(key, new HyperLogLogPlusCounterNew());
+ hllcMap.put(key, new HLLCounter());
}
return hllcMap.get(key);
}
@@ -78,7 +78,7 @@ public class ColumnCardinalityReducer extends KylinReducer<IntWritable, BytesWri
it = keys.iterator();
while (it.hasNext()) {
int key = it.next();
- HyperLogLogPlusCounterNew hllc = hllcMap.get(key);
+ HLLCounter hllc = hllcMap.get(key);
ByteBuffer buf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
buf.clear();
hllc.writeRegisters(buf);
http://git-wip-us.apache.org/repos/asf/kylin/blob/e6e330a8/source-hive/src/test/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducerTest.java
----------------------------------------------------------------------
diff --git a/source-hive/src/test/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducerTest.java b/source-hive/src/test/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducerTest.java
index 410543a..c32e76d 100644
--- a/source-hive/src/test/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducerTest.java
+++ b/source-hive/src/test/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducerTest.java
@@ -35,7 +35,7 @@ import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
import org.apache.hadoop.mrunit.types.Pair;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.measure.BufferedMeasureCodec;
-import org.apache.kylin.measure.hllc.HyperLogLogPlusCounterNew;
+import org.apache.kylin.measure.hllc.HLLCounter;
import org.junit.Before;
import org.junit.Test;
@@ -57,7 +57,7 @@ public class ColumnCardinalityReducerTest {
}
private byte[] getBytes(String str) throws IOException {
- HyperLogLogPlusCounterNew hllc = new HyperLogLogPlusCounterNew();
+ HLLCounter hllc = new HLLCounter();
StringTokenizer tokenizer = new StringTokenizer(str, ColumnCardinalityMapper.DEFAULT_DELIM);
int i = 0;
while (tokenizer.hasMoreTokens()) {