You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/11/27 10:01:06 UTC
[11/13] incubator-kylin git commit: KYLIN-976 Add ingester;
Build part done, in-mem cube test pass
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleMinAggregator.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleMinAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleMinAggregator.java
deleted file mode 100644
index 7430c4e..0000000
--- a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleMinAggregator.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.aggregation.basic;
-
-import org.apache.kylin.aggregation.MeasureAggregator;
-import org.apache.kylin.common.datatype.DoubleMutable;
-
-/**
- */
-@SuppressWarnings("serial")
-public class DoubleMinAggregator extends MeasureAggregator<DoubleMutable> {
-
- DoubleMutable min = null;
-
- @Override
- public void reset() {
- min = null;
- }
-
- @Override
- public void aggregate(DoubleMutable value) {
- if (min == null)
- min = new DoubleMutable(value.get());
- else if (min.get() > value.get())
- min.set(value.get());
- }
-
- @Override
- public DoubleMutable getState() {
- return min;
- }
-
- @Override
- public int getMemBytesEstimate() {
- return guessDoubleMemBytes();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleSumAggregator.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleSumAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleSumAggregator.java
deleted file mode 100644
index 6e66c1b..0000000
--- a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleSumAggregator.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.aggregation.basic;
-
-import org.apache.kylin.aggregation.MeasureAggregator;
-import org.apache.kylin.common.datatype.DoubleMutable;
-
-/**
- */
-@SuppressWarnings("serial")
-public class DoubleSumAggregator extends MeasureAggregator<DoubleMutable> {
-
- DoubleMutable sum = new DoubleMutable();
-
- @Override
- public void reset() {
- sum.set(0.0);
- }
-
- @Override
- public void aggregate(DoubleMutable value) {
- sum.set(sum.get() + value.get());
- }
-
- @Override
- public DoubleMutable getState() {
- return sum;
- }
-
- @Override
- public int getMemBytesEstimate() {
- return guessDoubleMemBytes();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongMaxAggregator.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongMaxAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongMaxAggregator.java
deleted file mode 100644
index 7fdf3d8..0000000
--- a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongMaxAggregator.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.aggregation.basic;
-
-import org.apache.kylin.aggregation.MeasureAggregator;
-import org.apache.kylin.common.datatype.LongMutable;
-
-/**
- */
-@SuppressWarnings("serial")
-public class LongMaxAggregator extends MeasureAggregator<LongMutable> {
-
- LongMutable max = null;
-
- @Override
- public void reset() {
- max = null;
- }
-
- @Override
- public void aggregate(LongMutable value) {
- if (max == null)
- max = new LongMutable(value.get());
- else if (max.get() < value.get())
- max.set(value.get());
- }
-
- @Override
- public LongMutable getState() {
- return max;
- }
-
- @Override
- public int getMemBytesEstimate() {
- return guessLongMemBytes();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongMinAggregator.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongMinAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongMinAggregator.java
deleted file mode 100644
index 22ae865..0000000
--- a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongMinAggregator.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.aggregation.basic;
-
-import org.apache.kylin.aggregation.MeasureAggregator;
-import org.apache.kylin.common.datatype.LongMutable;
-
-/**
- */
-@SuppressWarnings("serial")
-public class LongMinAggregator extends MeasureAggregator<LongMutable> {
-
- LongMutable min = null;
-
- @Override
- public void reset() {
- min = null;
- }
-
- @Override
- public void aggregate(LongMutable value) {
- if (min == null)
- min = new LongMutable(value.get());
- else if (min.get() > value.get())
- min.set(value.get());
- }
-
- @Override
- public LongMutable getState() {
- return min;
- }
-
- @Override
- public int getMemBytesEstimate() {
- return guessLongMemBytes();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongSumAggregator.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongSumAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongSumAggregator.java
deleted file mode 100644
index 38d728a..0000000
--- a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongSumAggregator.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.aggregation.basic;
-
-import org.apache.kylin.aggregation.MeasureAggregator;
-import org.apache.kylin.common.datatype.LongMutable;
-
-/**
- */
-@SuppressWarnings("serial")
-public class LongSumAggregator extends MeasureAggregator<LongMutable> {
-
- LongMutable sum = new LongMutable();
-
- @Override
- public void reset() {
- sum.set(0);
- }
-
- @Override
- public void aggregate(LongMutable value) {
- sum.set(sum.get() + value.get());
- }
-
- @Override
- public LongMutable getState() {
- return sum;
- }
-
- @Override
- public int getMemBytesEstimate() {
- return guessLongMemBytes();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCAggregation.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCAggregation.java b/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCAggregation.java
deleted file mode 100644
index d5ceba5..0000000
--- a/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCAggregation.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.aggregation.hllc;
-
-import java.util.List;
-
-import org.apache.kylin.aggregation.AggregationType;
-import org.apache.kylin.aggregation.MeasureAggregator;
-import org.apache.kylin.common.datatype.DataType;
-import org.apache.kylin.common.datatype.DataTypeSerializer;
-import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-
-public class HLLCAggregation extends AggregationType {
-
- private final DataType dataType;
-
- public HLLCAggregation(String dataType) {
- this.dataType = DataType.getType(dataType);
-
- if (this.dataType.getPrecision() < 10 || this.dataType.getPrecision() > 16)
- throw new IllegalArgumentException("HLLC precision must be between 10 and 16");
- }
-
- @Override
- public DataType getAggregationDataType() {
- return dataType;
- }
-
- @Override
- public Class<? extends DataTypeSerializer<?>> getAggregationDataSeralizer() {
- return HLLCSerializer.class;
- }
-
- @Override
- public void validate(MeasureDesc measureDesc) throws IllegalArgumentException {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public MeasureAggregator<?> newAggregator() {
- if (dataType.isHLLC())
- return new HLLCAggregator(dataType.getPrecision());
- else
- return new LDCAggregator();
- }
-
- @Override
- public List<TblColRef> getColumnsNeedDictionary(MeasureDesc measureDesc) {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public Object reEncodeDictionary(Object value, List<Dictionary<?>> oldDicts, List<Dictionary<?>> newDicts) {
- // TODO Auto-generated method stub
- return null;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCAggregationFactory.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCAggregationFactory.java b/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCAggregationFactory.java
deleted file mode 100644
index 18c021d..0000000
--- a/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCAggregationFactory.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.aggregation.hllc;
-
-import org.apache.kylin.aggregation.AggregationType;
-import org.apache.kylin.aggregation.IAggregationFactory;
-import org.apache.kylin.metadata.model.FunctionDesc;
-
-public class HLLCAggregationFactory implements IAggregationFactory {
-
- @Override
- public AggregationType createAggregationType(String funcName, String dataType) {
- if (FunctionDesc.FUNC_COUNT_DISTINCT.equalsIgnoreCase(funcName) == false)
- throw new IllegalArgumentException();
-
- return new HLLCAggregation(dataType);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCAggregator.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCAggregator.java
deleted file mode 100644
index 8f85fe8..0000000
--- a/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCAggregator.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.aggregation.hllc;
-
-import org.apache.kylin.aggregation.MeasureAggregator;
-import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
-
-/**
- */
-@SuppressWarnings("serial")
-public class HLLCAggregator extends MeasureAggregator<HyperLogLogPlusCounter> {
-
- final int precision;
- HyperLogLogPlusCounter sum = null;
-
- public HLLCAggregator(int precision) {
- this.precision = precision;
- }
-
- @Override
- public void reset() {
- sum = null;
- }
-
- @Override
- public void aggregate(HyperLogLogPlusCounter value) {
- if (sum == null)
- sum = new HyperLogLogPlusCounter(value);
- else
- sum.merge(value);
- }
-
- @Override
- public HyperLogLogPlusCounter getState() {
- return sum;
- }
-
- @Override
- public int getMemBytesEstimate() {
- // 1024 + 60 returned by AggregationCacheMemSizeTest
- return 8 // aggregator obj shell
- + 4 // precision
- + 8 // ref to HLLC
- + 8 // HLLC obj shell
- + 32 + (1 << precision); // HLLC internal
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCSerializer.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCSerializer.java b/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCSerializer.java
deleted file mode 100644
index 5612892..0000000
--- a/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCSerializer.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.aggregation.hllc;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.kylin.common.datatype.DataType;
-import org.apache.kylin.common.datatype.DataTypeSerializer;
-import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
-
-/**
- * @author yangli9
- *
- */
-public class HLLCSerializer extends DataTypeSerializer<HyperLogLogPlusCounter> {
-
- // be thread-safe and avoid repeated obj creation
- private ThreadLocal<HyperLogLogPlusCounter> current = new ThreadLocal<HyperLogLogPlusCounter>();
-
- private int precision;
-
- public HLLCSerializer(DataType type) {
- this.precision = type.getPrecision();
- }
-
- @Override
- public void serialize(HyperLogLogPlusCounter value, ByteBuffer out) {
- try {
- value.writeRegisters(out);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- private HyperLogLogPlusCounter current() {
- HyperLogLogPlusCounter hllc = current.get();
- if (hllc == null) {
- hllc = new HyperLogLogPlusCounter(precision);
- current.set(hllc);
- }
- return hllc;
- }
-
- @Override
- public HyperLogLogPlusCounter deserialize(ByteBuffer in) {
- HyperLogLogPlusCounter hllc = current();
- try {
- hllc.readRegisters(in);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- return hllc;
- }
-
- @Override
- public int peekLength(ByteBuffer in) {
- return current().peekLength(in);
- }
-
- @Override
- public int maxLength() {
- return current().maxLength();
- }
-
- @Override
- public int getStorageBytesEstimate() {
- return current().maxLength();
- }
-
- @Override
- public HyperLogLogPlusCounter valueOf(byte[] value) {
- HyperLogLogPlusCounter hllc = current();
- hllc.clear();
- if (value == null)
- hllc.add("__nUlL__");
- else
- hllc.add(value);
- return hllc;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/LDCAggregator.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/LDCAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/LDCAggregator.java
deleted file mode 100644
index 151c1ee..0000000
--- a/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/LDCAggregator.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.aggregation.hllc;
-
-import org.apache.kylin.aggregation.MeasureAggregator;
-import org.apache.kylin.common.datatype.LongMutable;
-
-/**
- * Long Distinct Count
- */
-@SuppressWarnings("serial")
-public class LDCAggregator extends MeasureAggregator<LongMutable> {
-
- private static LongMutable ZERO = new LongMutable(0);
-
- private HLLCAggregator hllAgg = null;
- private LongMutable state = new LongMutable(0);
-
- @SuppressWarnings("rawtypes")
- public void setDependentAggregator(MeasureAggregator agg) {
- this.hllAgg = (HLLCAggregator) agg;
- }
-
- @Override
- public void reset() {
- }
-
- @Override
- public void aggregate(LongMutable value) {
- }
-
- @Override
- public LongMutable getState() {
- if (hllAgg == null) {
- return ZERO;
- } else {
- state.set(hllAgg.getState().getCountEstimate());
- return state;
- }
- }
-
- @Override
- public int getMemBytesEstimate() {
- return guessLongMemBytes();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNAggregation.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNAggregation.java b/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNAggregation.java
deleted file mode 100644
index 251abd9..0000000
--- a/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNAggregation.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.aggregation.topn;
-
-import java.util.List;
-
-import org.apache.kylin.aggregation.AggregationType;
-import org.apache.kylin.aggregation.MeasureAggregator;
-import org.apache.kylin.aggregation.hllc.HLLCSerializer;
-import org.apache.kylin.common.datatype.DataType;
-import org.apache.kylin.common.datatype.DataTypeSerializer;
-import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-
-public class TopNAggregation extends AggregationType {
-
- private final DataType dataType;
-
- public TopNAggregation(String dataType) {
- this.dataType = DataType.getType(dataType);
-
- if (this.dataType.getPrecision() < 1 || this.dataType.getPrecision() > 1000)
- throw new IllegalArgumentException("TopN precision must be between 1 and 1000");
- }
-
- @Override
- public DataType getAggregationDataType() {
- return dataType;
- }
-
- @Override
- public Class<? extends DataTypeSerializer<?>> getAggregationDataSeralizer() {
- return HLLCSerializer.class;
- }
-
- @Override
- public void validate(MeasureDesc measureDesc) throws IllegalArgumentException {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public MeasureAggregator<?> newAggregator() {
- return new TopNAggregator();
- }
-
- @Override
- public List<TblColRef> getColumnsNeedDictionary(MeasureDesc measureDesc) {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public Object reEncodeDictionary(Object value, List<Dictionary<?>> oldDicts, List<Dictionary<?>> newDicts) {
- // TODO Auto-generated method stub
- return null;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNAggregationFactory.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNAggregationFactory.java b/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNAggregationFactory.java
deleted file mode 100644
index 1ea22c8..0000000
--- a/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNAggregationFactory.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.aggregation.topn;
-
-import org.apache.kylin.aggregation.AggregationType;
-import org.apache.kylin.aggregation.IAggregationFactory;
-import org.apache.kylin.metadata.model.FunctionDesc;
-
-public class TopNAggregationFactory implements IAggregationFactory {
-
- @Override
- public AggregationType createAggregationType(String funcName, String dataType) {
- if (FunctionDesc.FUNC_TOP_N.equalsIgnoreCase(funcName) == false)
- throw new IllegalArgumentException();
-
- return new TopNAggregation(dataType);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNAggregator.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNAggregator.java
deleted file mode 100644
index 4f6c7ee..0000000
--- a/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNAggregator.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.aggregation.topn;
-
-import java.util.Map;
-
-import org.apache.kylin.aggregation.MeasureAggregator;
-import org.apache.kylin.common.topn.TopNCounter;
-import org.apache.kylin.common.util.ByteArray;
-
-import com.google.common.collect.Maps;
-
-/**
- *
- */
-@SuppressWarnings("serial")
-public class TopNAggregator extends MeasureAggregator<TopNCounter<ByteArray>> {
-
- int capacity = 0;
- TopNCounter<ByteArray> sum = null;
- Map<ByteArray, Double> sanityCheckMap;
-
- @Override
- public void reset() {
- sum = null;
- }
-
- @Override
- public void aggregate(TopNCounter<ByteArray> value) {
- if (sum == null) {
- capacity = value.getCapacity();
- sum = new TopNCounter<>(capacity);
- sanityCheckMap = Maps.newHashMap();
- }
- sum.merge(value);
- }
-
- @Override
- public TopNCounter<ByteArray> getState() {
-
- //sum.retain(capacity);
- return sum;
- }
-
- @Override
- public int getMemBytesEstimate() {
- return 8 * capacity / 4;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNCounterSerializer.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNCounterSerializer.java b/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNCounterSerializer.java
deleted file mode 100644
index 8088842..0000000
--- a/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNCounterSerializer.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.aggregation.topn;
-
-import java.nio.ByteBuffer;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.kylin.common.datatype.DataType;
-import org.apache.kylin.common.datatype.DataTypeSerializer;
-import org.apache.kylin.common.topn.Counter;
-import org.apache.kylin.common.topn.DoubleDeltaSerializer;
-import org.apache.kylin.common.topn.TopNCounter;
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.BytesUtil;
-
-/**
- *
- */
-public class TopNCounterSerializer extends DataTypeSerializer<TopNCounter<ByteArray>> {
-
- private DoubleDeltaSerializer dds = new DoubleDeltaSerializer(3);
-
- private int precision;
-
- public TopNCounterSerializer(DataType dataType) {
- this.precision = dataType.getPrecision();
- }
-
- @Override
- public int peekLength(ByteBuffer in) {
- int mark = in.position();
- @SuppressWarnings("unused")
- int capacity = in.getInt();
- int size = in.getInt();
- int keyLength = in.getInt();
- dds.deserialize(in);
- int len = in.position() - mark + keyLength * size;
- in.position(mark);
- return len;
- }
-
- @Override
- public int maxLength() {
- return precision * TopNCounter.EXTRA_SPACE_RATE * (4 + 8);
- }
-
- @Override
- public int getStorageBytesEstimate() {
- return precision * TopNCounter.EXTRA_SPACE_RATE * 8;
- }
-
- @Override
- public TopNCounter<ByteArray> valueOf(byte[] value) {
- ByteBuffer buffer = ByteBuffer.wrap(value);
- int sizeOfId = buffer.getInt();
- int keyEncodedValue = buffer.getInt();
- double counter = buffer.getDouble();
-
- ByteArray key = new ByteArray(sizeOfId);
- BytesUtil.writeUnsigned(keyEncodedValue, key.array(), 0, sizeOfId);
-
- TopNCounter<ByteArray> topNCounter = new TopNCounter<ByteArray>(precision * TopNCounter.EXTRA_SPACE_RATE);
- topNCounter.offer(key, counter);
- return topNCounter;
- }
-
- @Override
- public void serialize(TopNCounter<ByteArray> value, ByteBuffer out) {
- double[] counters = value.getCounters();
- List<ByteArray> peek = value.peek(1);
- int keyLength = peek.size() > 0 ? peek.get(0).length() : 0;
- out.putInt(value.getCapacity());
- out.putInt(value.size());
- out.putInt(keyLength);
- dds.serialize(counters, out);
- Iterator<Counter<ByteArray>> iterator = value.iterator();
- while (iterator.hasNext()) {
- out.put(iterator.next().getItem().array());
- }
- }
-
- @Override
- public TopNCounter<ByteArray> deserialize(ByteBuffer in) {
- int capacity = in.getInt();
- int size = in.getInt();
- int keyLength = in.getInt();
- double[] counters = dds.deserialize(in);
-
- TopNCounter<ByteArray> counter = new TopNCounter<ByteArray>(capacity);
- ByteArray byteArray;
- for (int i = 0; i < size; i++) {
- byteArray = new ByteArray(keyLength);
- in.get(byteArray.array());
- counter.offerToHead(byteArray, counters[i]);
- }
-
- return counter;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 4592b15..d909a4c 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -38,10 +38,10 @@ import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.persistence.Serializer;
import org.apache.kylin.common.restclient.Broadcaster;
import org.apache.kylin.common.restclient.CaseInsensitiveStringCache;
+import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.DimensionDesc;
-import org.apache.kylin.dict.Dictionary;
import org.apache.kylin.dict.DictionaryInfo;
import org.apache.kylin.dict.DictionaryManager;
import org.apache.kylin.dict.DistinctColumnValuesProvider;
@@ -182,7 +182,8 @@ public class CubeManager implements IRealizationProvider {
/**
* return null if no dictionary for given column
*/
- public Dictionary<?> getDictionary(CubeSegment cubeSeg, TblColRef col) {
+ @SuppressWarnings("unchecked")
+ public Dictionary<String> getDictionary(CubeSegment cubeSeg, TblColRef col) {
DictionaryInfo info = null;
try {
DictionaryManager dictMgr = getDictionaryManager();
@@ -199,7 +200,7 @@ public class CubeManager implements IRealizationProvider {
throw new IllegalStateException("Failed to get dictionary for cube segment" + cubeSeg + ", col" + col, e);
}
- return info.getDictionaryObject();
+ return (Dictionary<String>) info.getDictionaryObject();
}
public SnapshotTable buildSnapshotTable(CubeSegment cubeSeg, String lookupTable) throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
index 62df1e9..b29f83a 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
@@ -25,17 +25,17 @@ import java.util.TimeZone;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.common.util.ShardingHash;
import org.apache.kylin.cube.kv.RowConstants;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
-import org.apache.kylin.dict.Dictionary;
import org.apache.kylin.dict.IDictionaryAware;
-import org.apache.kylin.metadata.model.IBuildable;
import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
-import org.apache.kylin.metadata.model.IStorageAware;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.realization.IRealization;
+import org.apache.kylin.metadata.realization.IRealizationSegment;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
@@ -43,8 +43,6 @@ import com.fasterxml.jackson.annotation.JsonBackReference;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Objects;
import com.google.common.collect.Maps;
-import org.apache.kylin.metadata.realization.IRealization;
-import org.apache.kylin.metadata.realization.IRealizationSegment;
@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, IRealizationSegment {
@@ -267,6 +265,14 @@ public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, I
public void setStorageLocationIdentifier(String storageLocationIdentifier) {
this.storageLocationIdentifier = storageLocationIdentifier;
}
+
+ public Map<TblColRef, Dictionary<String>> buildDictionaryMap() {
+ Map<TblColRef, Dictionary<String>> result = Maps.newHashMap();
+ for (TblColRef col : getCubeDesc().getAllColumnsNeedDictionary()) {
+ result.put(col, (Dictionary<String>) getDictionary(col));
+ }
+ return result;
+ }
@Override
public int getColumnLength(TblColRef col) {
@@ -279,7 +285,7 @@ public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, I
}
@Override
- public Dictionary<?> getDictionary(TblColRef col) {
+ public Dictionary<String> getDictionary(TblColRef col) {
return CubeManager.getInstance(this.getCubeInstance().getConfig()).getDictionary(this, col);
}
@@ -427,4 +433,5 @@ public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, I
public IJoinedFlatTableDesc getJoinedFlatTableDesc() {
return new CubeJoinedFlatTableDesc(this.getCubeDesc(), this);
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java
index 3619d69..7f38c26 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java
@@ -1,23 +1,21 @@
package org.apache.kylin.cube.gridtable;
-import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
-import org.apache.kylin.aggregation.MeasureAggregator;
-import org.apache.kylin.common.datatype.DataTypeSerializer;
-import org.apache.kylin.common.datatype.StringSerializer;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.cube.kv.RowConstants;
-import org.apache.kylin.dict.Dictionary;
import org.apache.kylin.gridtable.DefaultGTComparator;
import org.apache.kylin.gridtable.GTInfo;
import org.apache.kylin.gridtable.IGTCodeSystem;
import org.apache.kylin.gridtable.IGTComparator;
+import org.apache.kylin.measure.MeasureAggregator;
+import org.apache.kylin.metadata.datatype.DataTypeSerializer;
/**
* defines how column values will be encoded to/ decoded from GTRecord
@@ -111,9 +109,6 @@ public class CubeCodeSystem implements IGTCodeSystem {
if (serializer instanceof DictionarySerializer) {
((DictionarySerializer) serializer).serializeWithRounding(value, roundingFlag, buf);
} else {
- if ((value instanceof String) && (!(serializer instanceof StringSerializer || serializer instanceof FixLenSerializer))) {
- value = serializer.valueOf((String) value);
- }
serializer.serialize(value, buf);
}
}
@@ -176,11 +171,6 @@ public class CubeCodeSystem implements IGTCodeSystem {
}
@Override
- public Object valueOf(byte[] value) {
- throw new UnsupportedOperationException();
- }
-
- @Override
public void serialize(Object value, ByteBuffer out) {
throw new UnsupportedOperationException();
}
@@ -230,10 +220,6 @@ public class CubeCodeSystem implements IGTCodeSystem {
return dictionary.getSizeOfId();
}
- @Override
- public Object valueOf(byte[] value) {
- throw new UnsupportedOperationException();
- }
}
static class FixLenSerializer extends DataTypeSerializer {
@@ -306,16 +292,6 @@ public class CubeCodeSystem implements IGTCodeSystem {
return fixLen;
}
- @Override
- public Object valueOf(byte[] value) {
- try {
- return new String(value, "UTF-8");
- } catch (UnsupportedEncodingException e) {
- // does not happen
- throw new RuntimeException(e);
- }
- }
-
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeGridTable.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeGridTable.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeGridTable.java
index aa0a530..05fc8a5 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeGridTable.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeGridTable.java
@@ -3,11 +3,11 @@ package org.apache.kylin.cube.gridtable;
import java.util.List;
import java.util.Map;
+import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.dict.Dictionary;
import org.apache.kylin.gridtable.GTInfo;
import org.apache.kylin.metadata.model.TblColRef;
@@ -16,24 +16,25 @@ import com.google.common.collect.Maps;
@SuppressWarnings("rawtypes")
public class CubeGridTable {
- public static Map<TblColRef, Dictionary<?>> getDimensionToDictionaryMap(CubeSegment cubeSeg, long cuboidId) {
+ public static Map<TblColRef, Dictionary<String>> getDimensionToDictionaryMap(CubeSegment cubeSeg, long cuboidId) {
CubeDesc cubeDesc = cubeSeg.getCubeDesc();
CubeManager cubeMgr = CubeManager.getInstance(cubeSeg.getCubeInstance().getConfig());
// build a dictionary map
- Map<TblColRef, Dictionary<?>> dictionaryMap = Maps.newHashMap();
+ Map<TblColRef, Dictionary<String>> dictionaryMap = Maps.newHashMap();
List<TblColRef> dimCols = Cuboid.findById(cubeDesc, cuboidId).getColumns();
for (TblColRef col : dimCols) {
- Dictionary<?> dictionary = cubeMgr.getDictionary(cubeSeg, col);
+ Dictionary<String> dictionary = cubeMgr.getDictionary(cubeSeg, col);
if (dictionary != null) {
dictionaryMap.put(col, dictionary);
}
}
+
return dictionaryMap;
}
public static GTInfo newGTInfo(CubeSegment cubeSeg, long cuboidId) throws NotEnoughGTInfoException {
- Map<TblColRef, Dictionary<?>> dictionaryMap = getDimensionToDictionaryMap(cubeSeg, cuboidId);
+ Map<TblColRef, Dictionary<String>> dictionaryMap = getDimensionToDictionaryMap(cubeSeg, cuboidId);
Cuboid cuboid = Cuboid.findById(cubeSeg.getCubeDesc(), cuboidId);
for (TblColRef dim : cuboid.getColumns()) {
if (cubeSeg.getCubeDesc().getRowkey().isUseDictionary(dim)) {
@@ -47,7 +48,7 @@ public class CubeGridTable {
return newGTInfo(cubeSeg.getCubeDesc(), cuboidId, dictionaryMap);
}
- public static GTInfo newGTInfo(CubeDesc cubeDesc, long cuboidId, Map<TblColRef, Dictionary<?>> dictionaryMap) {
+ public static GTInfo newGTInfo(CubeDesc cubeDesc, long cuboidId, Map<TblColRef, Dictionary<String>> dictionaryMap) {
Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidId);
CuboidToGridTableMapping mapping = new CuboidToGridTableMapping(cuboid);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java
index 36db773..2152301 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java
@@ -6,11 +6,11 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
-import org.apache.kylin.common.datatype.DataType;
import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.model.HBaseColumnDesc;
import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
+import org.apache.kylin.metadata.datatype.DataType;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.TblColRef;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/NotEnoughGTInfoException.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/NotEnoughGTInfoException.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/NotEnoughGTInfoException.java
index 045b11e..9bbcf75 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/NotEnoughGTInfoException.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/NotEnoughGTInfoException.java
@@ -18,5 +18,6 @@
package org.apache.kylin.cube.gridtable;
+@SuppressWarnings("serial")
public class NotEnoughGTInfoException extends Exception {
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java
index d30186e..c4d0a7e 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java
@@ -23,12 +23,12 @@ package org.apache.kylin.cube.gridtable;
import java.nio.ByteBuffer;
import java.util.Map;
-import org.apache.kylin.aggregation.MeasureAggregator;
-import org.apache.kylin.common.datatype.DataTypeSerializer;
import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.gridtable.GTInfo;
import org.apache.kylin.gridtable.IGTCodeSystem;
import org.apache.kylin.gridtable.IGTComparator;
+import org.apache.kylin.measure.MeasureAggregator;
+import org.apache.kylin.metadata.datatype.DataTypeSerializer;
@SuppressWarnings({ "rawtypes", "unchecked" })
public class TrimmedCubeCodeSystem implements IGTCodeSystem {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java
index 0a35559..58f94c1 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java
@@ -21,8 +21,8 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
+import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.dict.Dictionary;
import org.apache.kylin.gridtable.GTRecord;
import org.apache.kylin.gridtable.GTScanRequest;
import org.apache.kylin.gridtable.GridTable;
@@ -39,12 +39,12 @@ abstract public class AbstractInMemCubeBuilder {
private static Logger logger = LoggerFactory.getLogger(AbstractInMemCubeBuilder.class);
final protected CubeDesc cubeDesc;
- final protected Map<TblColRef, Dictionary<?>> dictionaryMap;
+ final protected Map<TblColRef, Dictionary<String>> dictionaryMap;
protected int taskThreadCount = 4;
protected int reserveMemoryMB = 200;
- public AbstractInMemCubeBuilder(CubeDesc cubeDesc, Map<TblColRef, Dictionary<?>> dictionaryMap) {
+ public AbstractInMemCubeBuilder(CubeDesc cubeDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) {
if (cubeDesc == null)
throw new NullPointerException();
if (dictionaryMap == null)
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
index ce912a3..5b6131f 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
@@ -29,15 +29,15 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.TimeUnit;
-import org.apache.kylin.aggregation.MeasureAggregators;
import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.common.util.MemoryBudgetController;
import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.dict.Dictionary;
import org.apache.kylin.gridtable.GTRecord;
import org.apache.kylin.gridtable.GTScanRequest;
import org.apache.kylin.gridtable.IGTScanner;
+import org.apache.kylin.measure.MeasureAggregators;
import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,7 +54,7 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
private int splitRowThreshold = Integer.MAX_VALUE;
private int unitRows = 1000;
- public DoggedCubeBuilder(CubeDesc cubeDesc, Map<TblColRef, Dictionary<?>> dictionaryMap) {
+ public DoggedCubeBuilder(CubeDesc cubeDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) {
super(cubeDesc, dictionaryMap);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
index a393179..5c59de7 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
@@ -28,9 +28,9 @@ import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.kylin.common.datatype.DoubleMutable;
import org.apache.kylin.common.topn.Counter;
import org.apache.kylin.common.topn.TopNCounter;
+import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.common.util.MemoryBudgetController;
import org.apache.kylin.common.util.Pair;
@@ -38,8 +38,6 @@ import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.cuboid.CuboidScheduler;
import org.apache.kylin.cube.gridtable.CubeGridTable;
import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
-import org.apache.kylin.dict.Dictionary;
import org.apache.kylin.gridtable.GTAggregateScanner;
import org.apache.kylin.gridtable.GTBuilder;
import org.apache.kylin.gridtable.GTInfo;
@@ -47,6 +45,7 @@ import org.apache.kylin.gridtable.GTRecord;
import org.apache.kylin.gridtable.GTScanRequest;
import org.apache.kylin.gridtable.GridTable;
import org.apache.kylin.gridtable.IGTScanner;
+import org.apache.kylin.metadata.datatype.DoubleMutable;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
@@ -66,7 +65,6 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
private final CuboidScheduler cuboidScheduler;
private final long baseCuboidId;
private final int totalCuboidCount;
- private final CubeJoinedFlatTableDesc intermediateTableDesc;
private final String[] metricsAggrFuncs;
private final MeasureDesc[] measureDescs;
private final int measureCount;
@@ -81,12 +79,11 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
private Object[] totalSumForSanityCheck;
private ICuboidCollector resultCollector;
- public InMemCubeBuilder(CubeDesc cubeDesc, Map<TblColRef, Dictionary<?>> dictionaryMap) {
+ public InMemCubeBuilder(CubeDesc cubeDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) {
super(cubeDesc, dictionaryMap);
this.cuboidScheduler = new CuboidScheduler(cubeDesc);
this.baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
this.totalCuboidCount = cuboidScheduler.getCuboidCount();
- this.intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null);
this.measureCount = cubeDesc.getMeasures().size();
this.measureDescs = cubeDesc.getMeasures().toArray(new MeasureDesc[measureCount]);
@@ -100,8 +97,6 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
this.metricsAggrFuncs = metricsAggrFuncsList.toArray(new String[metricsAggrFuncsList.size()]);
}
-
-
private GridTable newGridTableByCuboidID(long cuboidID) throws IOException {
GTInfo info = CubeGridTable.newGTInfo(cubeDesc, cuboidID, dictionaryMap);
@@ -114,7 +109,6 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
return gridTable;
}
-
@Override
public void build(BlockingQueue<List<String>> input, ICuboidWriter output) throws IOException {
ConcurrentNavigableMap<Long, CuboidResult> result = build(input);
@@ -447,7 +441,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
return updateCuboidResult(cuboidId, newGridTable, count, timeSpent, 0);
}
- //@SuppressWarnings("unused")
+ @SuppressWarnings({ "unused", "rawtypes", "unchecked" })
private void sanityCheck(long parentId, long cuboidId, Object[] totalSum) {
// double sum introduces error and causes result not exactly equal
for (int i = 0; i < totalSum.length; i++) {
@@ -508,9 +502,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
this.info = info;
this.input = input;
this.record = new GTRecord(info);
- this.inMemCubeBuilderInputConverter = new InMemCubeBuilderInputConverter(cubeDesc,
- InMemCubeBuilderUtils.createTopNLiteralColDictionaryMap(cubeDesc, intermediateTableDesc, dictionaryMap),
- info);
+ this.inMemCubeBuilderInputConverter = new InMemCubeBuilderInputConverter(cubeDesc, dictionaryMap, info);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java
index bf4278a..fed9479 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java
@@ -17,49 +17,39 @@
*/
package org.apache.kylin.cube.inmemcubing;
-import com.google.common.base.Preconditions;
+import java.util.List;
+import java.util.Map;
-import org.apache.kylin.aggregation.MeasureCodec;
-import org.apache.kylin.common.datatype.LongMutable;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
-import org.apache.kylin.dict.Dictionary;
import org.apache.kylin.gridtable.GTInfo;
import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.measure.MeasureIngester;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.MeasureDesc;
-
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
+import org.apache.kylin.metadata.model.ParameterDesc;
+import org.apache.kylin.metadata.model.TblColRef;
/**
*/
public class InMemCubeBuilderInputConverter {
- private static final LongMutable ONE = new LongMutable(1l);
-
- private final CubeDesc cubeDesc;
private final CubeJoinedFlatTableDesc intermediateTableDesc;
private final MeasureDesc[] measureDescs;
- private final MeasureCodec measureCodec;
+ private final MeasureIngester<?>[] measureIngesters;
private final int measureCount;
- private final ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
- private final Map<Integer, Dictionary<String>> topNLiteralColDictMap;
+ private final Map<TblColRef, Dictionary<String>> dictionaryMap;
private final GTInfo gtInfo;
- public InMemCubeBuilderInputConverter(CubeDesc cubeDesc, Map<Integer, Dictionary<String>> topNLiteralColDictMap, GTInfo gtInfo) {
- this.cubeDesc = cubeDesc;
+ public InMemCubeBuilderInputConverter(CubeDesc cubeDesc, Map<TblColRef, Dictionary<String>> dictionaryMap, GTInfo gtInfo) {
this.gtInfo = gtInfo;
this.intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null);
this.measureCount = cubeDesc.getMeasures().size();
this.measureDescs = cubeDesc.getMeasures().toArray(new MeasureDesc[measureCount]);
- this.measureCodec = new MeasureCodec(cubeDesc.getMeasures());
- this.topNLiteralColDictMap = Preconditions.checkNotNull(topNLiteralColDictMap, "topNLiteralColDictMap cannot be null");
+ this.measureIngesters = MeasureIngester.create(cubeDesc.getMeasures());
+ this.dictionaryMap = dictionaryMap;
}
public final GTRecord convert(List<String> row) {
@@ -89,59 +79,38 @@ public class InMemCubeBuilderInputConverter {
}
private Object[] buildValue(List<String> row) {
-
Object[] values = new Object[measureCount];
for (int i = 0; i < measureCount; i++) {
- MeasureDesc measureDesc = measureDescs[i];
- int[] flatTableIdx = intermediateTableDesc.getMeasureColumnIndexes()[i];
- FunctionDesc function = cubeDesc.getMeasures().get(i).getFunction();
- if (flatTableIdx == null) {
- values[i] = measureCodec.getSerializer(i).valueOf(measureDesc.getFunction().getParameter().getValue());
- } else if (function.isCount() || function.isHolisticCountDistinct()) {
+ values[i] = buildValueOf(i, row);
+ }
+ return values;
+ }
+
+ private Object buildValueOf(int idxOfMeasure, List<String> row) {
+ MeasureDesc measure = measureDescs[idxOfMeasure];
+ FunctionDesc function = measure.getFunction();
+ int[] colIdxOnFlatTable = intermediateTableDesc.getMeasureColumnIndexes()[idxOfMeasure];
+
+ int paramCount = function.getParameterCount();
+ String[] inputToMeasure = new String[paramCount];
+
+ // pick up parameter values
+ ParameterDesc param = function.getParameter();
+ int paramColIdx = 0; // index among parameters of column type
+ for (int i = 0; i < paramCount; i++, param = param.getNextParameter()) {
+ String value;
+ if (function.isCount() || function.isHolisticCountDistinct()) {
// note for holistic count distinct, this value will be ignored
- values[i] = ONE;
- } else if (function.isTopN()) {
- // encode the key column with dict, and get the counter column;
- int keyColIndex = flatTableIdx[flatTableIdx.length - 1];
- Dictionary<String> literalColDict = topNLiteralColDictMap.get(keyColIndex);
- int keyColEncoded = literalColDict.getIdFromValue(row.get(keyColIndex));
- valueBuf.clear();
- valueBuf.putInt(literalColDict.getSizeOfId());
- valueBuf.putInt(keyColEncoded);
- if (flatTableIdx.length == 1) {
- // only literalCol, use 1.0 as counter
- valueBuf.putDouble(1.0);
- } else {
- // get the counter column value
- valueBuf.putDouble(Double.valueOf(row.get(flatTableIdx[0])));
- }
-
- values[i] = measureCodec.getSerializer(i).valueOf(valueBuf.array());
-
- } else if (flatTableIdx.length == 1) {
- values[i] = measureCodec.getSerializer(i).valueOf(toBytes(row.get(flatTableIdx[0])));
+ value = "1";
+ } else if (param.isColumnType()) {
+ value = row.get(colIdxOnFlatTable[paramColIdx++]);
} else {
-
- byte[] result = null;
- for (int x = 0; x < flatTableIdx.length; x++) {
- byte[] split = toBytes(row.get(flatTableIdx[x]));
- if (result == null) {
- result = Arrays.copyOf(split, split.length);
- } else {
- byte[] newResult = new byte[result.length + split.length];
- System.arraycopy(result, 0, newResult, 0, result.length);
- System.arraycopy(split, 0, newResult, result.length, split.length);
- result = newResult;
- }
- }
- values[i] = measureCodec.getSerializer(i).valueOf(result);
+ value = param.getValue();
}
+ inputToMeasure[i] = value;
}
- return values;
- }
-
- private byte[] toBytes(String v) {
- return v == null ? null : Bytes.toBytes(v);
+
+ return measureIngesters[idxOfMeasure].valueOf(inputToMeasure, measure, dictionaryMap);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderUtils.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderUtils.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderUtils.java
index 9d819a4..e8fa6d0 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderUtils.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderUtils.java
@@ -18,11 +18,12 @@
package org.apache.kylin.cube.inmemcubing;
import com.google.common.collect.Maps;
+
+import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
-import org.apache.kylin.dict.Dictionary;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.TblColRef;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java
index 4316376..62432f7 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java
@@ -21,10 +21,10 @@ package org.apache.kylin.cube.kv;
import java.util.Map;
import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.dict.Dictionary;
import org.apache.kylin.gridtable.GTRecord;
import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnIO.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnIO.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnIO.java
index 7fedd90..ba15b48 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnIO.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnIO.java
@@ -22,7 +22,7 @@ import java.util.Arrays;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.dict.IDictionaryAware;
import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnOrder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnOrder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnOrder.java
index a21fe9f..fea3736 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnOrder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnOrder.java
@@ -21,7 +21,7 @@ package org.apache.kylin.cube.kv;
import java.util.Collection;
import java.util.Comparator;
-import org.apache.kylin.common.datatype.DataType;
+import org.apache.kylin.metadata.datatype.DataType;
/**
* @author yangli9
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
index ef563ed..3e8ee13 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
@@ -41,6 +41,7 @@ import org.apache.kylin.common.persistence.RootPersistentEntity;
import org.apache.kylin.common.util.Array;
import org.apache.kylin.common.util.CaseInsensitiveStringMap;
import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.measure.MeasureType;
import org.apache.kylin.metadata.MetadataConstants;
import org.apache.kylin.metadata.MetadataManager;
import org.apache.kylin.metadata.model.ColumnDesc;
@@ -818,7 +819,8 @@ public class CubeDesc extends RootPersistentEntity {
}
for (MeasureDesc measure : measures) {
- result.addAll(measure.getColumnsNeedDictionary());
+ MeasureType aggrType = MeasureType.create(measure.getFunction());
+ result.addAll(aggrType.getColumnsNeedDictionary(measure));
}
return result;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/FunctionRule.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/FunctionRule.java b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/FunctionRule.java
index 8e36009..1121621 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/FunctionRule.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/FunctionRule.java
@@ -26,12 +26,12 @@ import java.util.Set;
import org.apache.commons.lang.StringUtils;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.datatype.DataType;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.validation.IValidatorRule;
import org.apache.kylin.cube.model.validation.ResultLevel;
import org.apache.kylin.cube.model.validation.ValidateContext;
import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.datatype.DataType;
import org.apache.kylin.metadata.model.ColumnDesc;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.MeasureDesc;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java b/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java
index 0cfd020..3e79226 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java
@@ -50,9 +50,11 @@ import com.google.common.collect.Maps;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
+
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.Cuboid;
@@ -146,7 +148,7 @@ public class CubingUtils {
return result;
}
- public static Map<TblColRef, Dictionary<?>> buildDictionary(final CubeInstance cubeInstance, Iterable<List<String>> recordList) throws IOException {
+ public static Map<TblColRef, Dictionary<String>> buildDictionary(final CubeInstance cubeInstance, Iterable<List<String>> recordList) throws IOException {
final List<TblColRef> columnsNeedToBuildDictionary = cubeInstance.getDescriptor().listDimensionColumnsExcludingDerived();
final HashMap<Integer, TblColRef> tblColRefMap = Maps.newHashMap();
int index = 0;
@@ -154,7 +156,7 @@ public class CubingUtils {
tblColRefMap.put(index++, column);
}
- HashMap<TblColRef, Dictionary<?>> result = Maps.newHashMap();
+ HashMap<TblColRef, Dictionary<String>> result = Maps.newHashMap();
HashMultimap<TblColRef, String> valueMap = HashMultimap.create();
for (List<String> row : recordList) {
@@ -173,18 +175,19 @@ public class CubingUtils {
return input == null ? null : input.getBytes();
}
});
- final Dictionary<?> dict = DictionaryGenerator.buildDictionaryFromValueEnumerator(tblColRef.getType(), new IterableDictionaryValueEnumerator(bytes));
+ final Dictionary<String> dict = DictionaryGenerator.buildDictionaryFromValueEnumerator(tblColRef.getType(), new IterableDictionaryValueEnumerator(bytes));
result.put(tblColRef, dict);
}
return result;
}
- public static Map<TblColRef, Dictionary<?>> writeDictionary(CubeSegment cubeSegment, Map<TblColRef, Dictionary<?>> dictionaryMap, long startOffset, long endOffset) {
- Map<TblColRef, Dictionary<?>> realDictMap = Maps.newHashMap();
+ @SuppressWarnings("unchecked")
+ public static Map<TblColRef, Dictionary<String>> writeDictionary(CubeSegment cubeSegment, Map<TblColRef, Dictionary<String>> dictionaryMap, long startOffset, long endOffset) {
+ Map<TblColRef, Dictionary<String>> realDictMap = Maps.newHashMap();
- for (Map.Entry<TblColRef, Dictionary<?>> entry : dictionaryMap.entrySet()) {
+ for (Map.Entry<TblColRef, Dictionary<String>> entry : dictionaryMap.entrySet()) {
final TblColRef tblColRef = entry.getKey();
- final Dictionary<?> dictionary = entry.getValue();
+ final Dictionary<String> dictionary = entry.getValue();
ReadableTable.TableSignature signature = new ReadableTable.TableSignature();
signature.setLastModifiedTime(System.currentTimeMillis());
signature.setPath(String.format("streaming_%s_%s", startOffset, endOffset));
@@ -195,7 +198,7 @@ public class CubingUtils {
try {
DictionaryInfo realDict = dictionaryManager.trySaveNewDict(dictionary, dictInfo);
cubeSegment.putDictResPath(tblColRef, realDict.getResourcePath());
- realDictMap.put(tblColRef, realDict.getDictionaryObject());
+ realDictMap.put(tblColRef, (Dictionary<String>) realDict.getDictionaryObject());
} catch (IOException e) {
logger.error("error save dictionary for column:" + tblColRef, e);
throw new RuntimeException("error save dictionary for column:" + tblColRef, e);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/gridtable/DefaultGTComparator.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/DefaultGTComparator.java b/core-cube/src/main/java/org/apache/kylin/gridtable/DefaultGTComparator.java
index f7623e3..714571f 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/DefaultGTComparator.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/DefaultGTComparator.java
@@ -1,7 +1,7 @@
package org.apache.kylin.gridtable;
import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.common.util.Dictionary;
public class DefaultGTComparator implements IGTComparator {
@Override
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
index 01696e8..eb8d212 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
@@ -7,10 +7,10 @@ import java.util.Iterator;
import java.util.Map.Entry;
import java.util.SortedMap;
-import org.apache.kylin.aggregation.MeasureAggregator;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.common.util.MemoryBudgetController;
+import org.apache.kylin.measure.MeasureAggregator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java
index 229c679..d3a03d1 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java
@@ -5,10 +5,10 @@ import java.util.BitSet;
import java.util.Iterator;
import java.util.LinkedList;
-import org.apache.kylin.common.datatype.DataType;
import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.cube.gridtable.CubeCodeSystem;
import org.apache.kylin.cube.util.KryoUtils;
+import org.apache.kylin.metadata.datatype.DataType;
import org.apache.kylin.metadata.model.TblColRef;
public class GTInfo {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/gridtable/GTSampleCodeSystem.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTSampleCodeSystem.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTSampleCodeSystem.java
index 2783f55..b3133be 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTSampleCodeSystem.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTSampleCodeSystem.java
@@ -2,9 +2,9 @@ package org.apache.kylin.gridtable;
import java.nio.ByteBuffer;
-import org.apache.kylin.aggregation.MeasureAggregator;
-import org.apache.kylin.common.datatype.DataTypeSerializer;
import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.measure.MeasureAggregator;
+import org.apache.kylin.metadata.datatype.DataTypeSerializer;
@SuppressWarnings({ "rawtypes", "unchecked" })
/**
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/gridtable/IGTCodeSystem.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/IGTCodeSystem.java b/core-cube/src/main/java/org/apache/kylin/gridtable/IGTCodeSystem.java
index 0e61cf2..3a22091 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/IGTCodeSystem.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/IGTCodeSystem.java
@@ -2,8 +2,8 @@ package org.apache.kylin.gridtable;
import java.nio.ByteBuffer;
-import org.apache.kylin.aggregation.MeasureAggregator;
import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.measure.MeasureAggregator;
public interface IGTCodeSystem {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/gridtable/UnitTestSupport.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/UnitTestSupport.java b/core-cube/src/main/java/org/apache/kylin/gridtable/UnitTestSupport.java
index f8d7f30..ff71b4f 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/UnitTestSupport.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/UnitTestSupport.java
@@ -22,11 +22,11 @@ import java.util.ArrayList;
import java.util.BitSet;
import java.util.List;
-import org.apache.kylin.common.datatype.DataType;
-import org.apache.kylin.common.datatype.LongMutable;
import org.apache.kylin.common.util.DateFormat;
import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.gridtable.GTInfo.Builder;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.datatype.LongMutable;
public class UnitTestSupport {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/test/java/org/apache/kylin/aggregation/topn/TopNCounterSerializerTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/aggregation/topn/TopNCounterSerializerTest.java b/core-cube/src/test/java/org/apache/kylin/aggregation/topn/TopNCounterSerializerTest.java
deleted file mode 100644
index 8ae44b6..0000000
--- a/core-cube/src/test/java/org/apache/kylin/aggregation/topn/TopNCounterSerializerTest.java
+++ /dev/null
@@ -1,60 +0,0 @@
-package org.apache.kylin.aggregation.topn;
-
-import java.nio.ByteBuffer;
-
-import org.apache.kylin.common.datatype.DataType;
-import org.apache.kylin.common.topn.TopNCounter;
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.BytesUtil;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- *
- */
-public class TopNCounterSerializerTest {
-
- private static TopNCounterSerializer serializer = new TopNCounterSerializer(DataType.getType("topn(10)"));
-
- @Test
- public void testSerialization() {
- TopNCounter<ByteArray> vs = new TopNCounter<ByteArray>(50);
- Integer[] stream = { 1, 1, 2, 9, 1, 2, 3, 7, 7, 1, 3, 1, 1 };
- for (Integer i : stream) {
- vs.offer(new ByteArray(Bytes.toBytes(i)));
- }
-
- ByteBuffer out = ByteBuffer.allocate(1024);
- serializer.serialize(vs, out);
-
- byte[] copyBytes = new byte[out.position()];
- System.arraycopy(out.array(), 0, copyBytes, 0, out.position());
-
- ByteBuffer in = ByteBuffer.wrap(copyBytes);
- TopNCounter<ByteArray> vsNew = serializer.deserialize(in);
-
- Assert.assertEquals(vs.toString(), vsNew.toString());
-
- }
-
- @Test
- public void testValueOf() {
-
- TopNCounter<ByteArray> origin = new TopNCounter<ByteArray>(10);
- ByteArray key = new ByteArray(1);
- ByteBuffer byteBuffer = key.asBuffer();
- BytesUtil.writeVLong(20l, byteBuffer);
- origin.offer(key, 1.0);
-
- byteBuffer = ByteBuffer.allocate(1024);
- byteBuffer.putInt(1);
- byteBuffer.putInt(20);
- byteBuffer.putDouble(1.0);
- TopNCounter<ByteArray> counter = serializer.valueOf(byteBuffer.array());
-
-
- Assert.assertEquals(origin.toString(), counter.toString());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/test/java/org/apache/kylin/cube/DictionaryManagerTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/DictionaryManagerTest.java b/core-cube/src/test/java/org/apache/kylin/cube/DictionaryManagerTest.java
index d7feb56..bce228d 100644
--- a/core-cube/src/test/java/org/apache/kylin/cube/DictionaryManagerTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/cube/DictionaryManagerTest.java
@@ -23,10 +23,10 @@ import static org.junit.Assert.assertTrue;
import java.util.HashSet;
+import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.common.util.JsonUtil;
import org.apache.kylin.common.util.LocalFileMetadataTestCase;
import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.dict.Dictionary;
import org.apache.kylin.dict.DictionaryInfo;
import org.apache.kylin.dict.DictionaryManager;
import org.apache.kylin.metadata.model.TblColRef;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilderStressTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilderStressTest.java b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilderStressTest.java
index 935e840..c25bad7 100644
--- a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilderStressTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilderStressTest.java
@@ -26,10 +26,10 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.common.util.LocalFileMetadataTestCase;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.dict.Dictionary;
import org.apache.kylin.gridtable.GTRecord;
import org.apache.kylin.metadata.model.TblColRef;
import org.junit.AfterClass;
@@ -51,7 +51,7 @@ public class DoggedCubeBuilderStressTest extends LocalFileMetadataTestCase {
private static CubeInstance cube;
private static String flatTable;
- private static Map<TblColRef, Dictionary<?>> dictionaryMap;
+ private static Map<TblColRef, Dictionary<String>> dictionaryMap;
@BeforeClass
public static void before() throws IOException {