You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2017/01/07 02:17:49 UTC
[07/10] kylin git commit: KYLIN-2331 by layer spark cubing
http://git-wip-us.apache.org/repos/asf/kylin/blob/46ea7562/core-metadata/src/main/java/org/apache/kylin/measure/MeasureAggregators.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureAggregators.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureAggregators.java
index 8951b69..710f324 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureAggregators.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureAggregators.java
@@ -84,6 +84,24 @@ public class MeasureAggregators implements Serializable {
}
}
+ public void aggregate(Object[] values1, Object[] values2, Object[] result) {
+ assert values1.length == values2.length && values2.length == descLength && values1.length == result.length;
+
+ for (int i = 0; i < descLength; i++) {
+ result[i] = aggs[i].aggregate(values1[i], values2[i]);
+ }
+
+ }
+
+ public void aggregate(Object[] values1, Object[] values2, Object[] result, boolean[] aggrMask) {
+ assert values1.length == values2.length && values2.length == descLength && values1.length == result.length && result.length == aggrMask.length;
+ for (int i = 0; i < descLength; i++) {
+ if (aggrMask[i]) {
+ result[i] = aggs[i].aggregate(values1[i], values2[i]);
+ }
+ }
+ }
+
public void collectStates(Object[] states) {
for (int i = 0; i < descLength; i++) {
states[i] = aggs[i].getState();
http://git-wip-us.apache.org/repos/asf/kylin/blob/46ea7562/core-metadata/src/main/java/org/apache/kylin/measure/MeasureCodec.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureCodec.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureCodec.java
index edaf806..2d73e59 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureCodec.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureCodec.java
@@ -18,19 +18,19 @@
package org.apache.kylin.measure;
-import java.nio.ByteBuffer;
-import java.util.Collection;
-
import org.apache.kylin.metadata.datatype.DataType;
import org.apache.kylin.metadata.datatype.DataTypeSerializer;
import org.apache.kylin.metadata.model.MeasureDesc;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+
/**
* @author yangli9
*
*/
@SuppressWarnings({ "rawtypes" })
-public class MeasureCodec {
+public class MeasureCodec implements java.io.Serializable {
private int nMeasures;
private DataTypeSerializer[] serializers;
http://git-wip-us.apache.org/repos/asf/kylin/blob/46ea7562/core-metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java
index 0076252..26b7298 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java
@@ -18,14 +18,14 @@
package org.apache.kylin.measure;
-import java.util.Collection;
-import java.util.Map;
-
import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.TblColRef;
-abstract public class MeasureIngester<V> {
+import java.util.Collection;
+import java.util.Map;
+
+abstract public class MeasureIngester<V> implements java.io.Serializable {
public static MeasureIngester<?> create(MeasureDesc measure) {
return measure.getFunction().getMeasureType().newIngester();
@@ -42,6 +42,10 @@ abstract public class MeasureIngester<V> {
abstract public V valueOf(String[] values, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> dictionaryMap);
+ public void reset() {
+
+ }
+
public V reEncodeDictionary(V value, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> oldDicts, Map<TblColRef, Dictionary<String>> newDicts) {
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/46ea7562/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
index 89ff382..3338c8c 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
@@ -18,11 +18,6 @@
package org.apache.kylin.measure;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.MeasureDesc;
@@ -32,13 +27,18 @@ import org.apache.kylin.metadata.realization.SQLDigest;
import org.apache.kylin.metadata.tuple.Tuple;
import org.apache.kylin.metadata.tuple.TupleInfo;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
/**
* MeasureType captures how a kind of aggregation is defined, how it is calculated
* during cube build, and how it is involved in query and storage scan.
*
* @param <T> the Java type of aggregation data object, e.g. HLLCounter
*/
-abstract public class MeasureType<T> {
+abstract public class MeasureType<T> implements java.io.Serializable {
/* ============================================================================
* Define
http://git-wip-us.apache.org/repos/asf/kylin/blob/46ea7562/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalMaxAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalMaxAggregator.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalMaxAggregator.java
index aa42476..7a57965 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalMaxAggregator.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalMaxAggregator.java
@@ -43,6 +43,20 @@ public class BigDecimalMaxAggregator extends MeasureAggregator<BigDecimal> {
}
@Override
+ public BigDecimal aggregate(BigDecimal value1, BigDecimal value2) {
+ if (value1 == null) {
+ return value2;
+ } else if (value2 == null) {
+ return value1;
+ }
+
+ if (value1.compareTo(value2) > 0)
+ return value1;
+ else
+ return value2;
+ }
+
+ @Override
public BigDecimal getState() {
return max;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/46ea7562/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalMinAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalMinAggregator.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalMinAggregator.java
index 81193ad..71ba7fb 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalMinAggregator.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalMinAggregator.java
@@ -27,24 +27,38 @@ import org.apache.kylin.measure.MeasureAggregator;
@SuppressWarnings("serial")
public class BigDecimalMinAggregator extends MeasureAggregator<BigDecimal> {
- BigDecimal max = null;
+ BigDecimal min = null;
@Override
public void reset() {
- max = null;
+ min = null;
}
@Override
public void aggregate(BigDecimal value) {
- if (max == null)
- max = value;
- else if (max.compareTo(value) > 0)
- max = value;
+ if (min == null)
+ min = value;
+ else if (min.compareTo(value) > 0)
+ min = value;
+ }
+
+ @Override
+ public BigDecimal aggregate(BigDecimal value1, BigDecimal value2) {
+ if (value1 == null) {
+ return value2;
+ } else if (value2 == null) {
+ return value1;
+ }
+
+ if (value1.compareTo(value2) > 0)
+ return value2;
+ else
+ return value1;
}
@Override
public BigDecimal getState() {
- return max;
+ return min;
}
@Override
http://git-wip-us.apache.org/repos/asf/kylin/blob/46ea7562/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalSumAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalSumAggregator.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalSumAggregator.java
index 5e00c63..9f6ffc2 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalSumAggregator.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalSumAggregator.java
@@ -40,6 +40,16 @@ public class BigDecimalSumAggregator extends MeasureAggregator<BigDecimal> {
}
@Override
+ public BigDecimal aggregate(BigDecimal value1, BigDecimal value2) {
+ if (value1 == null) {
+ return value2;
+ } else if (value2 == null) {
+ return value1;
+ }
+ return value1.add(value2);
+ }
+
+ @Override
public BigDecimal getState() {
return sum;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/46ea7562/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java
index a2f3980..90ecb0d 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java
@@ -22,25 +22,24 @@ import java.util.Map;
import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.measure.MeasureIngester;
-import org.apache.kylin.metadata.datatype.DoubleMutable;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.TblColRef;
-public class DoubleIngester extends MeasureIngester<DoubleMutable> {
-
- // avoid repeated object creation
- private DoubleMutable current = new DoubleMutable();
+public class DoubleIngester extends MeasureIngester<Double> {
@Override
- public DoubleMutable valueOf(String[] values, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) {
+ public Double valueOf(String[] values, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) {
if (values.length > 1)
throw new IllegalArgumentException();
- DoubleMutable l = current;
if (values[0] == null || values[0].length() == 0)
- l.set(0L);
+ return new Double(0);
else
- l.set(Double.parseDouble(values[0]));
- return l;
+ return Double.parseDouble(values[0]);
+ }
+
+ @Override
+ public void reset() {
+
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/46ea7562/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleMaxAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleMaxAggregator.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleMaxAggregator.java
index 25911e8..f33555e 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleMaxAggregator.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleMaxAggregator.java
@@ -19,14 +19,13 @@
package org.apache.kylin.measure.basic;
import org.apache.kylin.measure.MeasureAggregator;
-import org.apache.kylin.metadata.datatype.DoubleMutable;
/**
*/
@SuppressWarnings("serial")
-public class DoubleMaxAggregator extends MeasureAggregator<DoubleMutable> {
+public class DoubleMaxAggregator extends MeasureAggregator<Double> {
- DoubleMutable max = null;
+ Double max = null;
@Override
public void reset() {
@@ -34,15 +33,20 @@ public class DoubleMaxAggregator extends MeasureAggregator<DoubleMutable> {
}
@Override
- public void aggregate(DoubleMutable value) {
+ public void aggregate(Double value) {
if (max == null)
- max = new DoubleMutable(value.get());
- else if (max.get() < value.get())
- max.set(value.get());
+ max = value;
+ else if (max < value)
+ max = value;
}
@Override
- public DoubleMutable getState() {
+ public Double aggregate(Double value1, Double value2) {
+ return Math.max(value1, value2);
+ }
+
+ @Override
+ public Double getState() {
return max;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/46ea7562/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleMinAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleMinAggregator.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleMinAggregator.java
index be97deb..8e69f21 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleMinAggregator.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleMinAggregator.java
@@ -19,14 +19,13 @@
package org.apache.kylin.measure.basic;
import org.apache.kylin.measure.MeasureAggregator;
-import org.apache.kylin.metadata.datatype.DoubleMutable;
/**
*/
@SuppressWarnings("serial")
-public class DoubleMinAggregator extends MeasureAggregator<DoubleMutable> {
+public class DoubleMinAggregator extends MeasureAggregator<Double> {
- DoubleMutable min = null;
+ Double min = null;
@Override
public void reset() {
@@ -34,15 +33,20 @@ public class DoubleMinAggregator extends MeasureAggregator<DoubleMutable> {
}
@Override
- public void aggregate(DoubleMutable value) {
+ public void aggregate(Double value) {
if (min == null)
- min = new DoubleMutable(value.get());
- else if (min.get() > value.get())
- min.set(value.get());
+ min = value;
+ else if (min > value)
+ min = value;
}
@Override
- public DoubleMutable getState() {
+ public Double aggregate(Double value1, Double value2) {
+ return Math.min(value1, value2);
+ }
+
+ @Override
+ public Double getState() {
return min;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/46ea7562/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleSumAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleSumAggregator.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleSumAggregator.java
index f276817..df0ba52 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleSumAggregator.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleSumAggregator.java
@@ -19,27 +19,31 @@
package org.apache.kylin.measure.basic;
import org.apache.kylin.measure.MeasureAggregator;
-import org.apache.kylin.metadata.datatype.DoubleMutable;
/**
*/
@SuppressWarnings("serial")
-public class DoubleSumAggregator extends MeasureAggregator<DoubleMutable> {
+public class DoubleSumAggregator extends MeasureAggregator<Double> {
- DoubleMutable sum = new DoubleMutable();
+ Double sum = new Double(0);
@Override
public void reset() {
- sum.set(0.0);
+ sum = new Double(0);
}
@Override
- public void aggregate(DoubleMutable value) {
- sum.set(sum.get() + value.get());
+ public void aggregate(Double value) {
+ sum = sum + value;
}
@Override
- public DoubleMutable getState() {
+ public Double aggregate(Double value1, Double value2) {
+ return Double.valueOf(value1 + value2);
+ }
+
+ @Override
+ public Double getState() {
return sum;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/46ea7562/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java
index 45a1634..793acf2 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java
@@ -22,25 +22,24 @@ import java.util.Map;
import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.measure.MeasureIngester;
-import org.apache.kylin.metadata.datatype.LongMutable;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.TblColRef;
-public class LongIngester extends MeasureIngester<LongMutable> {
-
- // avoid repeated object creation
- private LongMutable current = new LongMutable();
+public class LongIngester extends MeasureIngester<Long> {
@Override
- public LongMutable valueOf(String[] values, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) {
+ public Long valueOf(String[] values, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) {
if (values.length > 1)
throw new IllegalArgumentException();
- LongMutable l = current;
if (values[0] == null || values[0].length() == 0)
- l.set(0L);
+ return new Long(0L);
else
- l.set(Long.parseLong(values[0]));
- return l;
+ return Long.valueOf(values[0]);
+ }
+
+ @Override
+ public void reset() {
+
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/46ea7562/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongMaxAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongMaxAggregator.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongMaxAggregator.java
index ca44f15..b9a2b94 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongMaxAggregator.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongMaxAggregator.java
@@ -19,14 +19,13 @@
package org.apache.kylin.measure.basic;
import org.apache.kylin.measure.MeasureAggregator;
-import org.apache.kylin.metadata.datatype.LongMutable;
/**
*/
@SuppressWarnings("serial")
-public class LongMaxAggregator extends MeasureAggregator<LongMutable> {
+public class LongMaxAggregator extends MeasureAggregator<Long> {
- LongMutable max = null;
+ Long max = null;
@Override
public void reset() {
@@ -34,15 +33,20 @@ public class LongMaxAggregator extends MeasureAggregator<LongMutable> {
}
@Override
- public void aggregate(LongMutable value) {
+ public void aggregate(Long value) {
if (max == null)
- max = new LongMutable(value.get());
- else if (max.get() < value.get())
- max.set(value.get());
+ max = value;
+ else if (max < value)
+ max = value;
}
@Override
- public LongMutable getState() {
+ public Long aggregate(Long value1, Long value2) {
+ return Math.max(value1, value2);
+ }
+
+ @Override
+ public Long getState() {
return max;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/46ea7562/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongMinAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongMinAggregator.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongMinAggregator.java
index dadc64e..9185142 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongMinAggregator.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongMinAggregator.java
@@ -19,14 +19,13 @@
package org.apache.kylin.measure.basic;
import org.apache.kylin.measure.MeasureAggregator;
-import org.apache.kylin.metadata.datatype.LongMutable;
/**
*/
@SuppressWarnings("serial")
-public class LongMinAggregator extends MeasureAggregator<LongMutable> {
+public class LongMinAggregator extends MeasureAggregator<Long> {
- LongMutable min = null;
+ Long min = null;
@Override
public void reset() {
@@ -34,15 +33,20 @@ public class LongMinAggregator extends MeasureAggregator<LongMutable> {
}
@Override
- public void aggregate(LongMutable value) {
+ public void aggregate(Long value) {
if (min == null)
- min = new LongMutable(value.get());
- else if (min.get() > value.get())
- min.set(value.get());
+ min = value;
+ else if (min > value)
+ min = value;
}
@Override
- public LongMutable getState() {
+ public Long aggregate(Long value1, Long value2) {
+ return Math.min(value1, value2);
+ }
+
+ @Override
+ public Long getState() {
return min;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/46ea7562/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongSumAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongSumAggregator.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongSumAggregator.java
index e7fdc9d..1f9c0d7 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongSumAggregator.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongSumAggregator.java
@@ -19,27 +19,31 @@
package org.apache.kylin.measure.basic;
import org.apache.kylin.measure.MeasureAggregator;
-import org.apache.kylin.metadata.datatype.LongMutable;
/**
*/
@SuppressWarnings("serial")
-public class LongSumAggregator extends MeasureAggregator<LongMutable> {
+public class LongSumAggregator extends MeasureAggregator<Long> {
- LongMutable sum = new LongMutable();
+ Long sum = new Long(0L);
@Override
public void reset() {
- sum.set(0);
+ sum = new Long(0L);
}
@Override
- public void aggregate(LongMutable value) {
- sum.set(sum.get() + value.get());
+ public void aggregate(Long value) {
+ sum += value;
}
@Override
- public LongMutable getState() {
+ public Long aggregate(Long value1, Long value2) {
+ return Long.valueOf(value1 + value2);
+ }
+
+ @Override
+ public Long getState() {
return sum;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/46ea7562/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapAggregator.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapAggregator.java
index be72090..cd0b4bb 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapAggregator.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapAggregator.java
@@ -42,6 +42,19 @@ public class BitmapAggregator extends MeasureAggregator<BitmapCounter> {
}
@Override
+ public BitmapCounter aggregate(BitmapCounter value1, BitmapCounter value2) {
+ if (value1 == null) {
+ return new BitmapCounter(value2);
+ } else if (value2 == null) {
+ return new BitmapCounter(value1);
+ }
+
+ BitmapCounter merged = new BitmapCounter(value1);
+ merged.merge(value2);
+ return merged;
+ }
+
+ @Override
public BitmapCounter getState() {
return sum;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/46ea7562/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapCounter.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapCounter.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapCounter.java
index 827390d..caab094 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapCounter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapCounter.java
@@ -18,6 +18,9 @@
package org.apache.kylin.measure.bitmap;
+import org.apache.commons.io.IOUtils;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
@@ -26,13 +29,10 @@ import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Iterator;
-import org.apache.commons.io.IOUtils;
-import org.roaringbitmap.buffer.MutableRoaringBitmap;
-
/**
* Created by sunyerui on 15/12/1.
*/
-public class BitmapCounter implements Comparable<BitmapCounter> {
+public class BitmapCounter implements Comparable<BitmapCounter>, java.io.Serializable {
private MutableRoaringBitmap bitmap = new MutableRoaringBitmap();
http://git-wip-us.apache.org/repos/asf/kylin/blob/46ea7562/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java
index 8e2b2f7..6ad82a1 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java
@@ -138,6 +138,11 @@ public class BitmapMeasureType extends MeasureType<BitmapCounter> {
}
return retValue;
}
+
+ @Override
+ public void reset() {
+ current = new BitmapCounter();
+ }
};
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/46ea7562/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapSerializer.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapSerializer.java
index 089d18c..c40f71b 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapSerializer.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapSerializer.java
@@ -29,7 +29,7 @@ import org.apache.kylin.metadata.datatype.DataTypeSerializer;
*/
public class BitmapSerializer extends DataTypeSerializer<BitmapCounter> {
- private ThreadLocal<BitmapCounter> current = new ThreadLocal<>();
+ private transient ThreadLocal<BitmapCounter> current = new ThreadLocal<>();
public BitmapSerializer(DataType type) {
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/46ea7562/core-metadata/src/main/java/org/apache/kylin/measure/extendedcolumn/ExtendedColumnMeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/extendedcolumn/ExtendedColumnMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/extendedcolumn/ExtendedColumnMeasureType.java
index 6fa8788..1b2cda3 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/extendedcolumn/ExtendedColumnMeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/extendedcolumn/ExtendedColumnMeasureType.java
@@ -254,6 +254,21 @@ public class ExtendedColumnMeasureType extends MeasureType<ByteArray> {
}
@Override
+ public ByteArray aggregate(ByteArray value1, ByteArray value2) {
+ if (value1 == null) {
+ return value2;
+ } else if (value2 == null) {
+ return value1;
+ } else if (!value1.equals(value2)) {
+ if (!warned) {
+ logger.warn("Extended column must be unique given same host column");
+ warned = true;
+ }
+ }
+ return value1;
+ }
+
+ @Override
public ByteArray getState() {
return byteArray;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/46ea7562/core-metadata/src/main/java/org/apache/kylin/measure/hllc/DenseRegister.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/DenseRegister.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/DenseRegister.java
index 5b929b2..4be9f71 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/DenseRegister.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/DenseRegister.java
@@ -24,7 +24,7 @@ import java.util.Map;
/**
* Created by xiefan on 16-12-9.
*/
-public class DenseRegister implements Register {
+public class DenseRegister implements Register, java.io.Serializable {
private int m;
http://git-wip-us.apache.org/repos/asf/kylin/blob/46ea7562/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCAggregator.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCAggregator.java
index 5966c04..7d5b216 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCAggregator.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCAggregator.java
@@ -46,6 +46,13 @@ public class HLLCAggregator extends MeasureAggregator<HLLCounter> {
}
@Override
+ public HLLCounter aggregate(HLLCounter value1, HLLCounter value2) {
+ HLLCounter result = new HLLCounter(value1);
+ result.merge(value2);
+ return result;
+ }
+
+ @Override
public HLLCounter getState() {
return sum;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/46ea7562/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java
index 9601653..de36b08 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java
@@ -105,6 +105,11 @@ public class HLLCMeasureType extends MeasureType<HLLCounter> {
}
return hllc;
}
+
+ @Override
+ public void reset() {
+ current = new HLLCounter(dataType.getPrecision());
+ }
};
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/46ea7562/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java
index e0992c7..df0cfaf 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java
@@ -18,21 +18,18 @@
package org.apache.kylin.measure.hllc;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
import org.apache.kylin.metadata.datatype.DataType;
import org.apache.kylin.metadata.datatype.DataTypeSerializer;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
/**
* @author yangli9
*
*/
public class HLLCSerializer extends DataTypeSerializer<HLLCounter> {
- // be thread-safe and avoid repeated obj creation
- private ThreadLocal<HLLCounter> current = new ThreadLocal<HLLCounter>();
-
private int precision;
public HLLCSerializer(DataType type) {
@@ -49,7 +46,7 @@ public class HLLCSerializer extends DataTypeSerializer<HLLCounter> {
}
private HLLCounter current() {
- HLLCounter hllc = current.get();
+ HLLCounter hllc = (HLLCounter) current.get();
if (hllc == null) {
hllc = new HLLCounter(precision);
current.set(hllc);
http://git-wip-us.apache.org/repos/asf/kylin/blob/46ea7562/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HyperLogLogPlusTable.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HyperLogLogPlusTable.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HyperLogLogPlusTable.java
index 9d8de07..5d7bfeb 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HyperLogLogPlusTable.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HyperLogLogPlusTable.java
@@ -25,7 +25,7 @@ import java.util.TreeMap;
* @author xjiang
*
*/
-public class HyperLogLogPlusTable {
+public class HyperLogLogPlusTable implements java.io.Serializable {
// threshold and bias data taken from google's bias correction data set:
// https://docs.google.com/document/d/1gyjfMHy43U9OWBXxfaeG-3MjGzejW1dlpyMwEYAAWEI/view?fullscreen#
http://git-wip-us.apache.org/repos/asf/kylin/blob/46ea7562/core-metadata/src/main/java/org/apache/kylin/measure/hllc/SingleValueRegister.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/SingleValueRegister.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/SingleValueRegister.java
index 7f612e2..ad6306a 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/SingleValueRegister.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/SingleValueRegister.java
@@ -20,7 +20,7 @@ package org.apache.kylin.measure.hllc;
/**
* Created by xiefan on 16-12-20.
*/
-public class SingleValueRegister implements Register {
+public class SingleValueRegister implements Register, java.io.Serializable {
private int singleValuePos;
http://git-wip-us.apache.org/repos/asf/kylin/blob/46ea7562/core-metadata/src/main/java/org/apache/kylin/measure/hllc/SparseRegister.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/SparseRegister.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/SparseRegister.java
index dd7d7c8..d47a05b 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/SparseRegister.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/SparseRegister.java
@@ -25,7 +25,7 @@ import java.util.TreeMap;
/**
* Created by xiefan on 16-12-9.
*/
-public class SparseRegister implements Register {
+public class SparseRegister implements Register, java.io.Serializable {
private Map<Integer, Byte> sparseRegister = new TreeMap<>();
http://git-wip-us.apache.org/repos/asf/kylin/blob/46ea7562/core-metadata/src/main/java/org/apache/kylin/measure/raw/RawAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/raw/RawAggregator.java b/core-metadata/src/main/java/org/apache/kylin/measure/raw/RawAggregator.java
index 6f66269..c3ecc74 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/raw/RawAggregator.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/raw/RawAggregator.java
@@ -47,6 +47,20 @@ public class RawAggregator extends MeasureAggregator<List<ByteArray>> {
}
@Override
+ public List<ByteArray> aggregate(List<ByteArray> value1, List<ByteArray> value2) {
+ if (value1 == null) {
+ return value2;
+ } else if (value2 == null) {
+ return value1;
+ }
+
+ List<ByteArray> result = new ArrayList<>(value1.size() + value2.size());
+ result.addAll(value1);
+ result.addAll(value2);
+ return result;
+ }
+
+ @Override
public List<ByteArray> getState() {
return list;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/46ea7562/core-metadata/src/main/java/org/apache/kylin/measure/raw/RawSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/raw/RawSerializer.java b/core-metadata/src/main/java/org/apache/kylin/measure/raw/RawSerializer.java
index 021c146..68a0273 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/raw/RawSerializer.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/raw/RawSerializer.java
@@ -34,13 +34,11 @@ public class RawSerializer extends DataTypeSerializer<List<ByteArray>> {
//FIXME to config this and RowConstants.ROWVALUE_BUFFER_SIZE in properties file
public static final int RAW_BUFFER_SIZE = 1024 * 1024;//1M
- private ThreadLocal<List<ByteArray>> current = new ThreadLocal<>();
-
public RawSerializer(DataType dataType) {
}
private List<ByteArray> current() {
- List<ByteArray> l = current.get();
+ List<ByteArray> l = (List<ByteArray>) current.get();
if (l == null) {
l = new ArrayList<ByteArray>();
current.set(l);
http://git-wip-us.apache.org/repos/asf/kylin/blob/46ea7562/core-metadata/src/main/java/org/apache/kylin/measure/topn/DoubleDeltaSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/DoubleDeltaSerializer.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/DoubleDeltaSerializer.java
index 38d5b20..ac925e2 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/topn/DoubleDeltaSerializer.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/DoubleDeltaSerializer.java
@@ -24,7 +24,7 @@ import java.nio.ByteBuffer;
*
* http://bitcharmer.blogspot.co.uk/2013/12/how-to-serialise-array-of-doubles-with.html
*/
-public class DoubleDeltaSerializer {
+public class DoubleDeltaSerializer implements java.io.Serializable {
// first 32 bits stores meta info
static final int PRECISION_BITS = 3;
@@ -55,7 +55,6 @@ public class DoubleDeltaSerializer {
this.precision = precision;
this.multiplier = (int) Math.pow(10, precision);
- this.deltasThreadLocal = new ThreadLocal<long[]>();
}
public void serialize(double[] values, ByteBuffer buf) {
@@ -112,6 +111,10 @@ public class DoubleDeltaSerializer {
int len = values.length - 1;
len = Math.max(0, len);
+ if (deltasThreadLocal == null) {
+ deltasThreadLocal = new ThreadLocal<>();
+ }
+
long[] deltas = deltasThreadLocal.get();
if (deltas == null || deltas.length < len) {
deltas = new long[len];
http://git-wip-us.apache.org/repos/asf/kylin/blob/46ea7562/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNAggregator.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNAggregator.java
index ef997eb..b5e316f 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNAggregator.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNAggregator.java
@@ -45,6 +45,15 @@ public class TopNAggregator extends MeasureAggregator<TopNCounter<ByteArray>> {
}
@Override
+ public TopNCounter<ByteArray> aggregate(TopNCounter<ByteArray> value1, TopNCounter<ByteArray> value2) {
+ TopNCounter<ByteArray> aggregated = new TopNCounter<>(capacity * 2);
+ aggregated.merge(value1);
+ aggregated.merge(value2);
+ aggregated.retain(capacity);
+ return aggregated;
+ }
+
+ @Override
public TopNCounter<ByteArray> getState() {
sum.retain(capacity);
return sum;
http://git-wip-us.apache.org/repos/asf/kylin/blob/46ea7562/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java
index caf7961..5e4b91e 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java
@@ -40,7 +40,7 @@ import com.google.common.collect.Maps;
*
* @param <T> type of data in the stream to be summarized
*/
-public class TopNCounter<T> implements Iterable<Counter<T>> {
+public class TopNCounter<T> implements Iterable<Counter<T>>, java.io.Serializable {
public static final int EXTRA_SPACE_RATE = 50;
http://git-wip-us.apache.org/repos/asf/kylin/blob/46ea7562/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
index c29af6c..8c8b5a6 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
@@ -24,6 +24,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import com.google.common.collect.Lists;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.Dictionary;
@@ -46,8 +47,6 @@ import org.apache.kylin.metadata.tuple.TupleInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Lists;
-
public class TopNMeasureType extends MeasureType<TopNCounter<ByteArray>> {
private static final Logger logger = LoggerFactory.getLogger(TopNMeasureType.class);
@@ -156,6 +155,7 @@ public class TopNMeasureType extends MeasureType<TopNCounter<ByteArray>> {
return topNCounter;
}
+
@Override
public TopNCounter<ByteArray> reEncodeDictionary(TopNCounter<ByteArray> value, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> oldDicts, Map<TblColRef, Dictionary<String>> newDicts) {
TopNCounter<ByteArray> topNCounter = value;
http://git-wip-us.apache.org/repos/asf/kylin/blob/46ea7562/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/BigDecimalSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/BigDecimalSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/BigDecimalSerializer.java
index 64968b8..b5043f5 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/BigDecimalSerializer.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/BigDecimalSerializer.java
@@ -35,7 +35,7 @@ public class BigDecimalSerializer extends DataTypeSerializer<BigDecimal> {
private static final Logger logger = LoggerFactory.getLogger(BigDecimalSerializer.class);
final DataType type;
- transient final int maxLength;
+ final int maxLength;
transient int avoidVerbose = 0;
http://git-wip-us.apache.org/repos/asf/kylin/blob/46ea7562/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/BooleanSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/BooleanSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/BooleanSerializer.java
index acb6de1..3d485d2 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/BooleanSerializer.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/BooleanSerializer.java
@@ -18,40 +18,26 @@
package org.apache.kylin.metadata.datatype;
-import java.nio.ByteBuffer;
-
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.BooleanUtils;
-public class BooleanSerializer extends DataTypeSerializer<LongMutable> {
+import java.nio.ByteBuffer;
- public final static String[] TRUE_VALUE_SET = { "true", "t", "on", "yes" };
+public class BooleanSerializer extends DataTypeSerializer<Long> {
- // be thread-safe and avoid repeated obj creation
- private ThreadLocal<LongMutable> current = new ThreadLocal<LongMutable>();
+ public final static String[] TRUE_VALUE_SET = { "true", "t", "on", "yes" };
public BooleanSerializer(DataType type) {
}
@Override
- public void serialize(LongMutable value, ByteBuffer out) {
- out.putLong(value.get());
- }
-
- private LongMutable current() {
- LongMutable l = current.get();
- if (l == null) {
- l = new LongMutable();
- current.set(l);
- }
- return l;
+ public void serialize(Long value, ByteBuffer out) {
+ out.putLong(value);
}
@Override
- public LongMutable deserialize(ByteBuffer in) {
- LongMutable l = current();
- l.set(in.getLong());
- return l;
+ public Long deserialize(ByteBuffer in) {
+ return in.getLong();
}
@Override
@@ -70,12 +56,10 @@ public class BooleanSerializer extends DataTypeSerializer<LongMutable> {
}
@Override
- public LongMutable valueOf(String str) {
- LongMutable l = current();
+ public Long valueOf(String str) {
if (str == null)
- l.set(0L);
+ return Long.valueOf(0L);
else
- l.set(BooleanUtils.toInteger(ArrayUtils.contains(TRUE_VALUE_SET, str.toLowerCase())));
- return l;
+ return Long.valueOf(BooleanUtils.toInteger(ArrayUtils.contains(TRUE_VALUE_SET, str.toLowerCase())));
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/46ea7562/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java
index a739377..a4a35a4 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java
@@ -18,19 +18,21 @@
package org.apache.kylin.metadata.datatype;
-import java.nio.ByteBuffer;
-import java.util.Map;
-
+import com.google.common.collect.Maps;
import org.apache.kylin.common.util.BytesSerializer;
-import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.nio.ByteBuffer;
+import java.util.Map;
/**
* Note: the implementations MUST be thread-safe.
*/
-abstract public class DataTypeSerializer<T> implements BytesSerializer<T> {
+abstract public class DataTypeSerializer<T> implements BytesSerializer<T>, java.io.Serializable {
final static Map<String, Class<?>> implementations = Maps.newHashMap();
+ protected transient ThreadLocal current = new ThreadLocal();
static {
implementations.put("char", StringSerializer.class);
implementations.put("varchar", StringSerializer.class);
@@ -94,4 +96,9 @@ abstract public class DataTypeSerializer<T> implements BytesSerializer<T> {
else
return value.toString();
}
+
+ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+ in.defaultReadObject();
+ current = new ThreadLocal();
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/46ea7562/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DateTimeSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DateTimeSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DateTimeSerializer.java
index 07f98b3..5101766 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DateTimeSerializer.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DateTimeSerializer.java
@@ -18,37 +18,23 @@
package org.apache.kylin.metadata.datatype;
-import java.nio.ByteBuffer;
-
import org.apache.kylin.common.util.DateFormat;
-public class DateTimeSerializer extends DataTypeSerializer<LongMutable> {
+import java.nio.ByteBuffer;
- // be thread-safe and avoid repeated obj creation
- private ThreadLocal<LongMutable> current = new ThreadLocal<LongMutable>();
+public class DateTimeSerializer extends DataTypeSerializer<Long> {
public DateTimeSerializer(DataType type) {
}
@Override
- public void serialize(LongMutable value, ByteBuffer out) {
- out.putLong(value.get());
- }
-
- private LongMutable current() {
- LongMutable l = current.get();
- if (l == null) {
- l = new LongMutable();
- current.set(l);
- }
- return l;
+ public void serialize(Long value, ByteBuffer out) {
+ out.putLong(value);
}
@Override
- public LongMutable deserialize(ByteBuffer in) {
- LongMutable l = current();
- l.set(in.getLong());
- return l;
+ public Long deserialize(ByteBuffer in) {
+ return in.getLong();
}
@Override
@@ -67,8 +53,8 @@ public class DateTimeSerializer extends DataTypeSerializer<LongMutable> {
}
@Override
- public LongMutable valueOf(String str) {
- return new LongMutable(DateFormat.stringToMillis(str));
+ public Long valueOf(String str) {
+ return Long.valueOf(DateFormat.stringToMillis(str));
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/46ea7562/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DoubleSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DoubleSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DoubleSerializer.java
index 976dc51..20cfff3 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DoubleSerializer.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DoubleSerializer.java
@@ -22,33 +22,19 @@ import java.nio.ByteBuffer;
/**
*/
-public class DoubleSerializer extends DataTypeSerializer<DoubleMutable> {
-
- // be thread-safe and avoid repeated obj creation
- private ThreadLocal<DoubleMutable> current = new ThreadLocal<DoubleMutable>();
+public class DoubleSerializer extends DataTypeSerializer<Double> {
public DoubleSerializer(DataType type) {
}
@Override
- public void serialize(DoubleMutable value, ByteBuffer out) {
- out.putDouble(value.get());
- }
-
- private DoubleMutable current() {
- DoubleMutable d = current.get();
- if (d == null) {
- d = new DoubleMutable();
- current.set(d);
- }
- return d;
+ public void serialize(Double value, ByteBuffer out) {
+ out.putDouble(value);
}
@Override
- public DoubleMutable deserialize(ByteBuffer in) {
- DoubleMutable d = current();
- d.set(in.getDouble());
- return d;
+ public Double deserialize(ByteBuffer in) {
+ return in.getDouble();
}
@Override
@@ -67,7 +53,7 @@ public class DoubleSerializer extends DataTypeSerializer<DoubleMutable> {
}
@Override
- public DoubleMutable valueOf(String str) {
- return new DoubleMutable(Double.parseDouble(str));
+ public Double valueOf(String str) {
+ return Double.parseDouble(str);
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/46ea7562/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/Int4Serializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/Int4Serializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/Int4Serializer.java
index 7b95505..0e82e11 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/Int4Serializer.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/Int4Serializer.java
@@ -18,39 +18,25 @@
package org.apache.kylin.metadata.datatype;
-import java.nio.ByteBuffer;
-
import org.apache.kylin.common.util.BytesUtil;
+import java.nio.ByteBuffer;
+
/**
*/
-public class Int4Serializer extends DataTypeSerializer<IntMutable> {
-
- // be thread-safe and avoid repeated obj creation
- private ThreadLocal<IntMutable> current = new ThreadLocal<IntMutable>();
+public class Int4Serializer extends DataTypeSerializer<Integer> {
public Int4Serializer(DataType type) {
}
@Override
- public void serialize(IntMutable value, ByteBuffer out) {
- BytesUtil.writeUnsigned(value.get(), 4, out);
- }
-
- private IntMutable current() {
- IntMutable l = current.get();
- if (l == null) {
- l = new IntMutable();
- current.set(l);
- }
- return l;
+ public void serialize(Integer value, ByteBuffer out) {
+ BytesUtil.writeUnsigned(value, 4, out);
}
@Override
- public IntMutable deserialize(ByteBuffer in) {
- IntMutable l = current();
- l.set(BytesUtil.readUnsigned(in, 4));
- return l;
+ public Integer deserialize(ByteBuffer in) {
+ return BytesUtil.readUnsigned(in, 4);
}
@Override
@@ -69,7 +55,7 @@ public class Int4Serializer extends DataTypeSerializer<IntMutable> {
}
@Override
- public IntMutable valueOf(String str) {
- return new IntMutable(Integer.parseInt(str));
+ public Integer valueOf(String str) {
+ return Integer.parseInt(str);
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/46ea7562/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/Long8Serializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/Long8Serializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/Long8Serializer.java
index fa333b2..7dd5aa7 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/Long8Serializer.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/Long8Serializer.java
@@ -18,39 +18,26 @@
package org.apache.kylin.metadata.datatype;
-import java.nio.ByteBuffer;
-
import org.apache.kylin.common.util.BytesUtil;
+import java.nio.ByteBuffer;
+
/**
*/
-public class Long8Serializer extends DataTypeSerializer<LongMutable> {
-
- // be thread-safe and avoid repeated obj creation
- private ThreadLocal<LongMutable> current = new ThreadLocal<LongMutable>();
+public class Long8Serializer extends DataTypeSerializer<Long> {
public Long8Serializer(DataType type) {
}
@Override
- public void serialize(LongMutable value, ByteBuffer out) {
- BytesUtil.writeLong(value.get(), out);
+ public void serialize(Long value, ByteBuffer out) {
+ BytesUtil.writeLong(value, out);
}
- private LongMutable current() {
- LongMutable l = current.get();
- if (l == null) {
- l = new LongMutable();
- current.set(l);
- }
- return l;
- }
@Override
- public LongMutable deserialize(ByteBuffer in) {
- LongMutable l = current();
- l.set(BytesUtil.readLong(in));
- return l;
+ public Long deserialize(ByteBuffer in) {
+ return BytesUtil.readLong(in);
}
@Override
@@ -69,7 +56,7 @@ public class Long8Serializer extends DataTypeSerializer<LongMutable> {
}
@Override
- public LongMutable valueOf(String str) {
- return new LongMutable(Long.parseLong(str));
+ public Long valueOf(String str) {
+ return Long.parseLong(str);
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/46ea7562/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/LongSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/LongSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/LongSerializer.java
index 9306a70..605dcd7 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/LongSerializer.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/LongSerializer.java
@@ -18,39 +18,25 @@
package org.apache.kylin.metadata.datatype;
-import java.nio.ByteBuffer;
-
import org.apache.kylin.common.util.BytesUtil;
+import java.nio.ByteBuffer;
+
/**
*/
-public class LongSerializer extends DataTypeSerializer<LongMutable> {
-
- // be thread-safe and avoid repeated obj creation
- private ThreadLocal<LongMutable> current = new ThreadLocal<LongMutable>();
+public class LongSerializer extends DataTypeSerializer<Long> {
public LongSerializer(DataType type) {
}
@Override
- public void serialize(LongMutable value, ByteBuffer out) {
- BytesUtil.writeVLong(value.get(), out);
- }
-
- private LongMutable current() {
- LongMutable l = current.get();
- if (l == null) {
- l = new LongMutable();
- current.set(l);
- }
- return l;
+ public void serialize(Long value, ByteBuffer out) {
+ BytesUtil.writeVLong(value, out);
}
@Override
- public LongMutable deserialize(ByteBuffer in) {
- LongMutable l = current();
- l.set(BytesUtil.readVLong(in));
- return l;
+ public Long deserialize(ByteBuffer in) {
+ return BytesUtil.readVLong(in);
}
@Override
@@ -75,7 +61,7 @@ public class LongSerializer extends DataTypeSerializer<LongMutable> {
}
@Override
- public LongMutable valueOf(String str) {
- return new LongMutable(Long.parseLong(str));
+ public Long valueOf(String str) {
+ return Long.parseLong(str);
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/46ea7562/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
index 184c290..a49d982 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
@@ -18,27 +18,27 @@
package org.apache.kylin.metadata.model;
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.kylin.measure.MeasureType;
-import org.apache.kylin.measure.MeasureTypeFactory;
-import org.apache.kylin.measure.basic.BasicMeasureType;
-import org.apache.kylin.metadata.datatype.DataType;
-
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+import org.apache.kylin.measure.MeasureType;
+import org.apache.kylin.measure.MeasureTypeFactory;
+import org.apache.kylin.measure.basic.BasicMeasureType;
+import org.apache.kylin.metadata.datatype.DataType;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
/**
*/
@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
-public class FunctionDesc {
+public class FunctionDesc implements Serializable {
public static FunctionDesc newInstance(String expression, ParameterDesc param, String returnType) {
FunctionDesc r = new FunctionDesc();
http://git-wip-us.apache.org/repos/asf/kylin/blob/46ea7562/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinDesc.java
index 6489244..dd1500b 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinDesc.java
@@ -18,17 +18,18 @@
package org.apache.kylin.metadata.model;
-import java.util.Arrays;
-
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
+import java.io.Serializable;
+import java.util.Arrays;
+
/**
*/
@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
-public class JoinDesc {
+public class JoinDesc implements Serializable {
// inner, left, right, outer...
@JsonProperty("type")
http://git-wip-us.apache.org/repos/asf/kylin/blob/46ea7562/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinTableDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinTableDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinTableDesc.java
index 513217e..51e5787 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinTableDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinTableDesc.java
@@ -28,7 +28,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import java.io.Serializable;
@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
-public class JoinTableDesc implements Serializable{
+public class JoinTableDesc implements Serializable {
@JsonProperty("table")
private String table;
http://git-wip-us.apache.org/repos/asf/kylin/blob/46ea7562/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinsTree.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinsTree.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinsTree.java
index a0b267d..c132d0e 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinsTree.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinsTree.java
@@ -18,15 +18,16 @@
package org.apache.kylin.metadata.model;
+import com.google.common.base.Preconditions;
+
+import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import com.google.common.base.Preconditions;
-
-public class JoinsTree {
+public class JoinsTree implements Serializable {
final Map<String, Chain> tableChains = new LinkedHashMap<>();
@@ -111,7 +112,7 @@ public class JoinsTree {
return chain.join;
}
- static class Chain {
+ static class Chain implements java.io.Serializable {
TableRef table; // pk side
JoinDesc join;
Chain fkSide;
http://git-wip-us.apache.org/repos/asf/kylin/blob/46ea7562/core-metadata/src/main/java/org/apache/kylin/metadata/model/MeasureDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/MeasureDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/MeasureDesc.java
index 6ddbbf4..deec4f2 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/MeasureDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/MeasureDesc.java
@@ -22,15 +22,15 @@ import java.io.Serializable;
import java.util.Objects;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
-import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
+import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
/**
*/
@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
-public class MeasureDesc implements Serializable{
+public class MeasureDesc implements Serializable {
@JsonProperty("name")
private String name;
http://git-wip-us.apache.org/repos/asf/kylin/blob/46ea7562/core-metadata/src/main/java/org/apache/kylin/metadata/model/ModelDimensionDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ModelDimensionDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ModelDimensionDesc.java
index bd3dfef..d14a56b 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ModelDimensionDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ModelDimensionDesc.java
@@ -29,7 +29,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
/**
*/
@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
-public class ModelDimensionDesc implements Serializable{
+public class ModelDimensionDesc implements Serializable {
@JsonProperty("table")
private String table;
@JsonProperty("columns")
http://git-wip-us.apache.org/repos/asf/kylin/blob/46ea7562/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
index c14d061..8ad20a8 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
@@ -18,21 +18,22 @@
package org.apache.kylin.metadata.model;
-import java.io.UnsupportedEncodingException;
-import java.util.Arrays;
-import java.util.List;
-
import com.fasterxml.jackson.annotation.JsonAutoDetect;
-import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
+import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
+import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
+import java.util.Arrays;
+import java.util.List;
+
/**
*/
@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
-public class ParameterDesc {
+public class ParameterDesc implements Serializable {
public static ParameterDesc newInstance(Object... objs) {
if (objs.length == 0)
http://git-wip-us.apache.org/repos/asf/kylin/blob/46ea7562/core-metadata/src/main/java/org/apache/kylin/metadata/model/PartitionDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/PartitionDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/PartitionDesc.java
index bab3ec3..c6e6425 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/PartitionDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/PartitionDesc.java
@@ -32,7 +32,7 @@ import java.io.Serializable;
/**
*/
@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
-public class PartitionDesc implements Serializable{
+public class PartitionDesc implements Serializable {
public static enum PartitionType {
APPEND, //
@@ -177,7 +177,7 @@ public class PartitionDesc implements Serializable{
String buildDateRangeCondition(PartitionDesc partDesc, long startInclusive, long endExclusive);
}
- public static class DefaultPartitionConditionBuilder implements IPartitionConditionBuilder {
+ public static class DefaultPartitionConditionBuilder implements IPartitionConditionBuilder, java.io.Serializable {
@Override
public String buildDateRangeCondition(PartitionDesc partDesc, long startInclusive, long endExclusive) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/46ea7562/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableRef.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableRef.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableRef.java
index 9837ff4..6f15f3c 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableRef.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableRef.java
@@ -18,26 +18,28 @@
package org.apache.kylin.metadata.model;
+import com.google.common.collect.Maps;
+
import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
-import com.google.common.collect.Maps;
-
public class TableRef implements Serializable{
- final private DataModelDesc model;
+ final transient private DataModelDesc model;
final private String alias;
final private TableDesc table;
final private Map<String, TblColRef> columns;
+ final private String modelName;
TableRef(DataModelDesc model, String alias, TableDesc table) {
this.model = model;
+ this.modelName = model.getName();
this.alias = alias;
this.table = table;
this.columns = Maps.newLinkedHashMap();
-
+
for (ColumnDesc col : table.getColumns()) {
columns.put(col.getName(), new TblColRef(this, col));
}
@@ -95,7 +97,7 @@ public class TableRef implements Serializable{
TableRef t = (TableRef) o;
- if ((model == null ? t.model == null : model.getName().equals(t.model.getName())) == false)
+ if ((modelName == null ? t.modelName != null : modelName.equals(t.modelName)) == false)
return false;
if ((alias == null ? t.alias == null : alias.equals(t.alias)) == false)
return false;
@@ -108,7 +110,7 @@ public class TableRef implements Serializable{
@Override
public int hashCode() {
int result = 0;
- result = 31 * result + model.getName().hashCode();
+ result = 31 * result + modelName.hashCode();
result = 31 * result + alias.hashCode();
result = 31 * result + table.getIdentity().hashCode();
return result;
http://git-wip-us.apache.org/repos/asf/kylin/blob/46ea7562/core-metadata/src/test/java/org/apache/kylin/measure/AggregatorMemEstimateTest.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/test/java/org/apache/kylin/measure/AggregatorMemEstimateTest.java b/core-metadata/src/test/java/org/apache/kylin/measure/AggregatorMemEstimateTest.java
index 40e5b29..3d48ac2 100644
--- a/core-metadata/src/test/java/org/apache/kylin/measure/AggregatorMemEstimateTest.java
+++ b/core-metadata/src/test/java/org/apache/kylin/measure/AggregatorMemEstimateTest.java
@@ -38,8 +38,6 @@ import org.apache.kylin.measure.extendedcolumn.ExtendedColumnMeasureType;
import org.apache.kylin.measure.hllc.HLLCAggregator;
import org.apache.kylin.measure.hllc.HLLCounter;
import org.apache.kylin.metadata.datatype.DataType;
-import org.apache.kylin.metadata.datatype.DoubleMutable;
-import org.apache.kylin.metadata.datatype.LongMutable;
import org.github.jamm.MemoryMeter;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -61,7 +59,7 @@ public class AggregatorMemEstimateTest extends LocalFileMetadataTestCase {
}
private List<? extends MeasureAggregator> basicAggregators() {
- LongMutable longVal = new LongMutable(1000);
+ Long longVal = new Long(1000);
LongMinAggregator longMin = new LongMinAggregator();
LongMaxAggregator longMax = new LongMaxAggregator();
LongSumAggregator longSum = new LongSumAggregator();
@@ -69,7 +67,7 @@ public class AggregatorMemEstimateTest extends LocalFileMetadataTestCase {
longMax.aggregate(longVal);
longSum.aggregate(longVal);
- DoubleMutable doubleVal = new DoubleMutable(1.0);
+ Double doubleVal = new Double(1.0);
DoubleMinAggregator doubleMin = new DoubleMinAggregator();
DoubleMaxAggregator doubleMax = new DoubleMaxAggregator();
DoubleSumAggregator doubleSum = new DoubleSumAggregator();
http://git-wip-us.apache.org/repos/asf/kylin/blob/46ea7562/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java
----------------------------------------------------------------------
diff --git a/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java b/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java
index dde0481..fcd0182 100644
--- a/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java
+++ b/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java
@@ -52,7 +52,6 @@ import org.apache.kylin.gridtable.GridTable;
import org.apache.kylin.gridtable.IGTScanner;
import org.apache.kylin.gridtable.memstore.GTSimpleMemStore;
import org.apache.kylin.metadata.datatype.DataType;
-import org.apache.kylin.metadata.datatype.LongMutable;
import org.apache.kylin.metadata.filter.ColumnTupleFilter;
import org.apache.kylin.metadata.filter.CompareTupleFilter;
import org.apache.kylin.metadata.filter.ConstantTupleFilter;
@@ -499,16 +498,16 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
GTRecord r = new GTRecord(table.getInfo());
GTBuilder builder = table.rebuild();
- builder.write(r.setValues("2015-01-14", "30", "Yang", new LongMutable(10), new BigDecimal("10.5")));
- builder.write(r.setValues("2015-01-14", "30", "Luke", new LongMutable(10), new BigDecimal("10.5")));
- builder.write(r.setValues("2015-01-15", "20", "Dong", new LongMutable(10), new BigDecimal("10.5")));
- builder.write(r.setValues("2015-01-15", "20", "Jason", new LongMutable(10), new BigDecimal("10.5")));
- builder.write(r.setValues("2015-01-15", "30", "Xu", new LongMutable(10), new BigDecimal("10.5")));
- builder.write(r.setValues("2015-01-16", "20", "Mahone", new LongMutable(10), new BigDecimal("10.5")));
- builder.write(r.setValues("2015-01-16", "20", "Qianhao", new LongMutable(10), new BigDecimal("10.5")));
- builder.write(r.setValues("2015-01-16", "30", "George", new LongMutable(10), new BigDecimal("10.5")));
- builder.write(r.setValues("2015-01-16", "30", "Shaofeng", new LongMutable(10), new BigDecimal("10.5")));
- builder.write(r.setValues("2015-01-17", "10", "Kejia", new LongMutable(10), new BigDecimal("10.5")));
+ builder.write(r.setValues("2015-01-14", "30", "Yang", new Long(10), new BigDecimal("10.5")));
+ builder.write(r.setValues("2015-01-14", "30", "Luke", new Long(10), new BigDecimal("10.5")));
+ builder.write(r.setValues("2015-01-15", "20", "Dong", new Long(10), new BigDecimal("10.5")));
+ builder.write(r.setValues("2015-01-15", "20", "Jason", new Long(10), new BigDecimal("10.5")));
+ builder.write(r.setValues("2015-01-15", "30", "Xu", new Long(10), new BigDecimal("10.5")));
+ builder.write(r.setValues("2015-01-16", "20", "Mahone", new Long(10), new BigDecimal("10.5")));
+ builder.write(r.setValues("2015-01-16", "20", "Qianhao", new Long(10), new BigDecimal("10.5")));
+ builder.write(r.setValues("2015-01-16", "30", "George", new Long(10), new BigDecimal("10.5")));
+ builder.write(r.setValues("2015-01-16", "30", "Shaofeng", new Long(10), new BigDecimal("10.5")));
+ builder.write(r.setValues("2015-01-17", "10", "Kejia", new Long(10), new BigDecimal("10.5")));
builder.close();
return table;
@@ -524,34 +523,34 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
for (int i = 0; i < 100000; i++) {
for (int j = 0; j < 10; j++)
- builder.write(r.setValues("2015-01-14", "30", "Yang", new LongMutable(10), new BigDecimal("10.5")));
+ builder.write(r.setValues("2015-01-14", "30", "Yang", new Long(10), new BigDecimal("10.5")));
for (int j = 0; j < 10; j++)
- builder.write(r.setValues("2015-01-14", "30", "Luke", new LongMutable(10), new BigDecimal("10.5")));
+ builder.write(r.setValues("2015-01-14", "30", "Luke", new Long(10), new BigDecimal("10.5")));
for (int j = 0; j < 10; j++)
- builder.write(r.setValues("2015-01-15", "20", "Dong", new LongMutable(10), new BigDecimal("10.5")));
+ builder.write(r.setValues("2015-01-15", "20", "Dong", new Long(10), new BigDecimal("10.5")));
for (int j = 0; j < 10; j++)
- builder.write(r.setValues("2015-01-15", "20", "Jason", new LongMutable(10), new BigDecimal("10.5")));
+ builder.write(r.setValues("2015-01-15", "20", "Jason", new Long(10), new BigDecimal("10.5")));
for (int j = 0; j < 10; j++)
- builder.write(r.setValues("2015-01-15", "30", "Xu", new LongMutable(10), new BigDecimal("10.5")));
+ builder.write(r.setValues("2015-01-15", "30", "Xu", new Long(10), new BigDecimal("10.5")));
for (int j = 0; j < 10; j++)
- builder.write(r.setValues("2015-01-16", "20", "Mahone", new LongMutable(10), new BigDecimal("10.5")));
+ builder.write(r.setValues("2015-01-16", "20", "Mahone", new Long(10), new BigDecimal("10.5")));
for (int j = 0; j < 10; j++)
- builder.write(r.setValues("2015-01-16", "20", "Qianhao", new LongMutable(10), new BigDecimal("10.5")));
+ builder.write(r.setValues("2015-01-16", "20", "Qianhao", new Long(10), new BigDecimal("10.5")));
for (int j = 0; j < 10; j++)
- builder.write(r.setValues("2015-01-16", "30", "George", new LongMutable(10), new BigDecimal("10.5")));
+ builder.write(r.setValues("2015-01-16", "30", "George", new Long(10), new BigDecimal("10.5")));
for (int j = 0; j < 10; j++)
- builder.write(r.setValues("2015-01-16", "30", "Shaofeng", new LongMutable(10), new BigDecimal("10.5")));
+ builder.write(r.setValues("2015-01-16", "30", "Shaofeng", new Long(10), new BigDecimal("10.5")));
for (int j = 0; j < 10; j++)
- builder.write(r.setValues("2015-01-17", "10", "Kejia", new LongMutable(10), new BigDecimal("10.5")));
+ builder.write(r.setValues("2015-01-17", "10", "Kejia", new Long(10), new BigDecimal("10.5")));
}
builder.close();
http://git-wip-us.apache.org/repos/asf/kylin/blob/46ea7562/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
index 1e3f078..700f821 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
@@ -31,6 +31,7 @@ import org.apache.kylin.engine.mr.steps.NDCuboidJob;
import org.apache.kylin.engine.mr.steps.SaveStatisticsStep;
import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.execution.AbstractExecutable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -96,7 +97,7 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
return result;
}
- private MapReduceExecutable createInMemCubingStep(String jobId, String cuboidRootPath) {
+ protected AbstractExecutable createInMemCubingStep(String jobId, String cuboidRootPath) {
// base cuboid job
MapReduceExecutable cubeStep = new MapReduceExecutable();
@@ -113,7 +114,7 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
cubeStep.setMapReduceParams(cmd.toString());
cubeStep.setMapReduceJobClass(getInMemCuboidJob());
- cubeStep.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES + "," + CubingJob.CUBE_SIZE_BYTES);
+// cubeStep.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES + "," + CubingJob.CUBE_SIZE_BYTES);
return cubeStep;
}
@@ -140,7 +141,7 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
baseCuboidStep.setMapReduceParams(cmd.toString());
baseCuboidStep.setMapReduceJobClass(getBaseCuboidJob());
- baseCuboidStep.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES);
+// baseCuboidStep.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES);
return baseCuboidStep;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/46ea7562/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java
index 3119c1e..7665350 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java
@@ -37,7 +37,7 @@ import org.slf4j.LoggerFactory;
public class HadoopUtil {
@SuppressWarnings("unused")
private static final Logger logger = LoggerFactory.getLogger(HadoopUtil.class);
- private static final ThreadLocal<Configuration> hadoopConfig = new ThreadLocal<>();
+ private static final transient ThreadLocal<Configuration> hadoopConfig = new ThreadLocal<>();
public static void setCurrentConfiguration(Configuration conf) {
hadoopConfig.set(conf);
http://git-wip-us.apache.org/repos/asf/kylin/blob/46ea7562/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index 86ac880..14252ee 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -75,6 +75,7 @@ public class JobBuilderSupport {
appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Fact_Distinct_Columns_" + seg.getRealization().getName() + "_Step");
appendExecCmdParameters(cmd, BatchConstants.ARG_CUBING_JOB_ID, jobId);
result.setMapReduceParams(cmd.toString());
+ result.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES);
return result;
}
@@ -204,4 +205,13 @@ public class JobBuilderSupport {
return paths;
}
+ public static String getCuboidOutputPathsByLevel(String cuboidRootPath, int level) {
+ if (level == 0) {
+ return cuboidRootPath + "base_cuboid";
+ } else {
+ return cuboidRootPath + level + "level_cuboid";
+ }
+ }
+
+
}