You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2016/02/26 10:55:18 UTC
[1/4] kylin git commit: KYLIN-1453 cuboid sharding based on specific
column
Repository: kylin
Updated Branches:
refs/heads/2.x-staging 2e1d2f6b6 -> 294fc7078
KYLIN-1453 cuboid sharding based on specific column
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/294fc707
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/294fc707
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/294fc707
Branch: refs/heads/2.x-staging
Commit: 294fc70785ab009e6b4d8a12cbeb609d46f89a93
Parents: 3f5074e
Author: honma <ho...@ebay.com>
Authored: Thu Feb 25 19:02:28 2016 +0800
Committer: honma <ho...@ebay.com>
Committed: Fri Feb 26 17:54:37 2016 +0800
----------------------------------------------------------------------
.../org/apache/kylin/common/util/BytesUtil.java | 18 +-
.../dict/TupleFilterFunctionTransformer.java | 170 +++++++++++++++++++
.../dict/TupleFilterFunctionTranslator.java | 166 ------------------
.../kylin/metadata/filter/CaseTupleFilter.java | 7 +-
.../metadata/filter/ColumnTupleFilter.java | 11 +-
.../metadata/filter/CompareTupleFilter.java | 10 +-
.../metadata/filter/ConstantTupleFilter.java | 16 +-
.../metadata/filter/DynamicTupleFilter.java | 9 +-
.../metadata/filter/ExtractTupleFilter.java | 7 +-
.../metadata/filter/FunctionTupleFilter.java | 16 +-
.../filter/ITupleFilterTransformer.java | 23 +++
.../metadata/filter/ITupleFilterTranslator.java | 26 ---
.../metadata/filter/LogicalTupleFilter.java | 10 +-
.../kylin/metadata/filter/TupleFilter.java | 5 +-
.../metadata/filter/TupleFilterSerializer.java | 25 ++-
.../common/coprocessor/FilterDecorator.java | 8 +-
.../hbase/cube/v2/CubeSegmentScanner.java | 10 +-
.../common/coprocessor/FilterBaseTest.java | 46 ++++-
.../common/coprocessor/FilterEvaluateTest.java | 4 +-
.../common/coprocessor/FilterSerializeTest.java | 26 ++-
20 files changed, 325 insertions(+), 288 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/294fc707/core-common/src/main/java/org/apache/kylin/common/util/BytesUtil.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/BytesUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/BytesUtil.java
index e01ce4f..4e0701c 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/BytesUtil.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/BytesUtil.java
@@ -57,7 +57,6 @@ public class BytesUtil {
return integer;
}
-
public static long readLong(ByteBuffer buffer, int size) {
long integer = 0;
for (int i = 0; i < size; i++) {
@@ -133,11 +132,15 @@ public class BytesUtil {
// from WritableUtils
// ============================================================================
+
public static void writeVInt(int i, ByteBuffer out) {
+
writeVLong(i, out);
+
}
public static void writeVLong(long i, ByteBuffer out) {
+
if (i >= -112 && i <= 127) {
out.put((byte) i);
return;
@@ -203,6 +206,8 @@ public class BytesUtil {
}
public static void writeUnsigned(int num, int size, ByteBuffer out) {
+
+
int mask = 0xff << ((size - 1) * 8);
for (int i = size; i > 0; i--) {
int v = (num & mask) >> (i - 1) * 8;
@@ -222,6 +227,7 @@ public class BytesUtil {
}
public static void writeLong(long num, ByteBuffer out) {
+
for (int i = 0; i < 8; i++) {
out.put((byte) num);
num >>>= 8;
@@ -257,6 +263,8 @@ public class BytesUtil {
}
int len = str.length();
BytesUtil.writeVInt(len, out);
+
+
for (int i = 0; i < len; i++) {
out.put((byte) str.charAt(i));
}
@@ -335,7 +343,7 @@ public class BytesUtil {
writeVInt(-1, out);
return;
}
- writeVInt(array.length, out);
+ writeVInt(length, out);
out.put(array, offset, length);
}
@@ -348,7 +356,7 @@ public class BytesUtil {
in.get(array);
return array;
}
-
+
public static int peekByteArrayLength(ByteBuffer in) {
int start = in.position();
int arrayLen = readVInt(in);
@@ -369,6 +377,7 @@ public class BytesUtil {
writeVInt(array.length, out);
byte b_true = (byte) 1;
byte b_false = (byte) 0;
+
for (int i = 0; i < array.length; i++) {
if (array[i])
out.put(b_true);
@@ -428,7 +437,4 @@ public class BytesUtil {
return sb.toString();
}
- public static void main(String[] args) throws Exception {
- }
-
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/294fc707/core-dictionary/src/main/java/org/apache/kylin/dict/TupleFilterFunctionTransformer.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/TupleFilterFunctionTransformer.java b/core-dictionary/src/main/java/org/apache/kylin/dict/TupleFilterFunctionTransformer.java
new file mode 100644
index 0000000..096a28d
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/TupleFilterFunctionTransformer.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.dict;
+
+import java.util.Collection;
+import java.util.ListIterator;
+
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.metadata.filter.ColumnTupleFilter;
+import org.apache.kylin.metadata.filter.CompareTupleFilter;
+import org.apache.kylin.metadata.filter.ConstantTupleFilter;
+import org.apache.kylin.metadata.filter.FunctionTupleFilter;
+import org.apache.kylin.metadata.filter.ITupleFilterTransformer;
+import org.apache.kylin.metadata.filter.LogicalTupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Primitives;
+
+/**
+ * only take effect when the compare filter has function
+ */
+public class TupleFilterFunctionTransformer implements ITupleFilterTransformer {
+ public static final Logger logger = LoggerFactory.getLogger(TupleFilterFunctionTransformer.class);
+
+ private IDictionaryAware dictionaryAware;
+
+ public TupleFilterFunctionTransformer(IDictionaryAware dictionaryAware) {
+ this.dictionaryAware = dictionaryAware;
+ }
+
+ @Override
+ public TupleFilter transform(TupleFilter tupleFilter) {
+ TupleFilter translated = null;
+ if (tupleFilter instanceof CompareTupleFilter) {
+ translated = translateCompareTupleFilter((CompareTupleFilter) tupleFilter);
+ if (translated != null) {
+ logger.info("Translated {" + tupleFilter + "} to IN clause: {" + translated + "}");
+ }
+ } else if (tupleFilter instanceof FunctionTupleFilter) {
+ translated = translateFunctionTupleFilter((FunctionTupleFilter) tupleFilter);
+ if (translated != null) {
+ logger.info("Translated {" + tupleFilter + "} to IN clause: {" + translated + "}");
+ }
+ } else if (tupleFilter instanceof LogicalTupleFilter) {
+ ListIterator<TupleFilter> childIterator = (ListIterator<TupleFilter>) tupleFilter.getChildren().listIterator();
+ while (childIterator.hasNext()) {
+ TupleFilter transformed = transform(childIterator.next());
+ if (transformed != null)
+ childIterator.set(transformed);
+ }
+ }
+ return translated == null ? tupleFilter : translated;
+ }
+
+ private TupleFilter translateFunctionTupleFilter(FunctionTupleFilter functionTupleFilter) {
+ if (!functionTupleFilter.isValid())
+ return null;
+
+ TblColRef columnRef = functionTupleFilter.getColumn();
+ Dictionary<?> dict = dictionaryAware.getDictionary(columnRef);
+ if (dict == null)
+ return null;
+
+ CompareTupleFilter translated = new CompareTupleFilter(FilterOperatorEnum.IN);
+ translated.addChild(new ColumnTupleFilter(columnRef));
+
+ try {
+ for (int i = dict.getMinId(); i <= dict.getMaxId(); i++) {
+ Object dictVal = dict.getValueFromId(i);
+ if ((Boolean) functionTupleFilter.invokeFunction(dictVal)) {
+ translated.addChild(new ConstantTupleFilter(dictVal));
+ }
+ }
+ } catch (Exception e) {
+ logger.debug(e.getMessage());
+ return null;
+ }
+ return translated;
+ }
+
+ @SuppressWarnings("unchecked")
+ private TupleFilter translateCompareTupleFilter(CompareTupleFilter compTupleFilter) {
+ if (compTupleFilter.getFunction() == null)
+ return null;
+
+ FunctionTupleFilter functionTupleFilter = compTupleFilter.getFunction();
+ if (!functionTupleFilter.isValid())
+ return null;
+
+ TblColRef columnRef = functionTupleFilter.getColumn();
+ Dictionary<?> dict = dictionaryAware.getDictionary(columnRef);
+ if (dict == null)
+ return null;
+
+ CompareTupleFilter translated = new CompareTupleFilter(FilterOperatorEnum.IN);
+ translated.addChild(new ColumnTupleFilter(columnRef));
+
+ try {
+ Collection<Object> inValues = Lists.newArrayList();
+ for (int i = dict.getMinId(); i <= dict.getMaxId(); i++) {
+ Object dictVal = dict.getValueFromId(i);
+ Object computedVal = functionTupleFilter.invokeFunction(dictVal);
+ Class clazz = Primitives.wrap(computedVal.getClass());
+ Object targetVal = compTupleFilter.getFirstValue();
+ if (Primitives.isWrapperType(clazz))
+ targetVal = clazz.cast(clazz.getDeclaredMethod("valueOf", String.class).invoke(null, compTupleFilter.getFirstValue()));
+
+ int comp = ((Comparable) computedVal).compareTo(targetVal);
+ boolean compResult = false;
+ switch (compTupleFilter.getOperator()) {
+ case EQ:
+ compResult = comp == 0;
+ break;
+ case NEQ:
+ compResult = comp != 0;
+ break;
+ case LT:
+ compResult = comp < 0;
+ break;
+ case LTE:
+ compResult = comp <= 0;
+ break;
+ case GT:
+ compResult = comp > 0;
+ break;
+ case GTE:
+ compResult = comp >= 0;
+ break;
+ case IN:
+ compResult = compTupleFilter.getValues().contains(computedVal.toString());
+ break;
+ case NOTIN:
+ compResult = !compTupleFilter.getValues().contains(computedVal.toString());
+ break;
+ default:
+ break;
+ }
+ if (compResult) {
+ inValues.add(dictVal);
+ }
+ }
+ translated.addChild(new ConstantTupleFilter(inValues));
+ } catch (Exception e) {
+ logger.debug(e.getMessage());
+ return null;
+ }
+ return translated;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/294fc707/core-dictionary/src/main/java/org/apache/kylin/dict/TupleFilterFunctionTranslator.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/TupleFilterFunctionTranslator.java b/core-dictionary/src/main/java/org/apache/kylin/dict/TupleFilterFunctionTranslator.java
deleted file mode 100644
index 1c96dd4..0000000
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/TupleFilterFunctionTranslator.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.dict;
-
-import java.util.ListIterator;
-
-import org.apache.kylin.common.util.Dictionary;
-import org.apache.kylin.metadata.filter.ColumnTupleFilter;
-import org.apache.kylin.metadata.filter.CompareTupleFilter;
-import org.apache.kylin.metadata.filter.ConstantTupleFilter;
-import org.apache.kylin.metadata.filter.FunctionTupleFilter;
-import org.apache.kylin.metadata.filter.ITupleFilterTranslator;
-import org.apache.kylin.metadata.filter.LogicalTupleFilter;
-import org.apache.kylin.metadata.filter.TupleFilter;
-import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.primitives.Primitives;
-
-/**
- * only take effect when the compare filter has function
- */
-public class TupleFilterFunctionTranslator implements ITupleFilterTranslator {
- public static final Logger logger = LoggerFactory.getLogger(TupleFilterFunctionTranslator.class);
-
- private IDictionaryAware dictionaryAware;
-
- public TupleFilterFunctionTranslator(IDictionaryAware dictionaryAware) {
- this.dictionaryAware = dictionaryAware;
- }
-
- @Override
- public TupleFilter translate(TupleFilter tupleFilter) {
- TupleFilter translated = null;
- if (tupleFilter instanceof CompareTupleFilter) {
- translated = translateCompareTupleFilter((CompareTupleFilter) tupleFilter);
- if (translated != null) {
- logger.info("Translated {" + tupleFilter + "} to IN clause: {" + translated + "}");
- }
- } else if (tupleFilter instanceof FunctionTupleFilter) {
- translated = translateFunctionTupleFilter((FunctionTupleFilter) tupleFilter);
- if (translated != null) {
- logger.info("Translated {" + tupleFilter + "} to IN clause: {" + translated + "}");
- }
- } else if (tupleFilter instanceof LogicalTupleFilter) {
- ListIterator<TupleFilter> childIterator = (ListIterator<TupleFilter>) tupleFilter.getChildren().listIterator();
- while (childIterator.hasNext()) {
- TupleFilter tempTranslated = translate(childIterator.next());
- if (tempTranslated != null)
- childIterator.set(tempTranslated);
- }
- }
- return translated == null ? tupleFilter : translated;
- }
-
- private TupleFilter translateFunctionTupleFilter(FunctionTupleFilter functionTupleFilter) {
- if (!functionTupleFilter.isValid())
- return null;
-
- TblColRef columnRef = functionTupleFilter.getColumn();
- Dictionary<?> dict = dictionaryAware.getDictionary(columnRef);
- if (dict == null)
- return null;
-
- CompareTupleFilter translated = new CompareTupleFilter(FilterOperatorEnum.IN);
- translated.addChild(new ColumnTupleFilter(columnRef));
-
- try {
- for (int i = dict.getMinId(); i <= dict.getMaxId(); i++) {
- Object dictVal = dict.getValueFromId(i);
- if ((Boolean) functionTupleFilter.invokeFunction(dictVal)) {
- translated.addChild(new ConstantTupleFilter(dictVal));
- }
- }
- } catch (Exception e) {
- logger.debug(e.getMessage());
- return null;
- }
- return translated;
- }
-
- @SuppressWarnings("unchecked")
- private TupleFilter translateCompareTupleFilter(CompareTupleFilter compTupleFilter) {
- if (compTupleFilter.getFunction() == null)
- return null;
-
- FunctionTupleFilter functionTupleFilter = compTupleFilter.getFunction();
- if (!functionTupleFilter.isValid())
- return null;
-
- TblColRef columnRef = functionTupleFilter.getColumn();
- Dictionary<?> dict = dictionaryAware.getDictionary(columnRef);
- if (dict == null)
- return null;
-
- CompareTupleFilter translated = new CompareTupleFilter(FilterOperatorEnum.IN);
- translated.addChild(new ColumnTupleFilter(columnRef));
-
- try {
- for (int i = dict.getMinId(); i <= dict.getMaxId(); i++) {
- Object dictVal = dict.getValueFromId(i);
- Object computedVal = functionTupleFilter.invokeFunction(dictVal);
- Class clazz = Primitives.wrap(computedVal.getClass());
- Object targetVal = compTupleFilter.getFirstValue();
- if (Primitives.isWrapperType(clazz))
- targetVal = clazz.cast(clazz.getDeclaredMethod("valueOf", String.class).invoke(null, compTupleFilter.getFirstValue()));
-
- int comp = ((Comparable) computedVal).compareTo(targetVal);
- boolean compResult = false;
- switch (compTupleFilter.getOperator()) {
- case EQ:
- compResult = comp == 0;
- break;
- case NEQ:
- compResult = comp != 0;
- break;
- case LT:
- compResult = comp < 0;
- break;
- case LTE:
- compResult = comp <= 0;
- break;
- case GT:
- compResult = comp > 0;
- break;
- case GTE:
- compResult = comp >= 0;
- break;
- case IN:
- compResult = compTupleFilter.getValues().contains(computedVal.toString());
- break;
- case NOTIN:
- compResult = !compTupleFilter.getValues().contains(computedVal.toString());
- break;
- default:
- break;
- }
- if (compResult) {
- translated.addChild(new ConstantTupleFilter(dictVal));
- }
- }
- } catch (Exception e) {
- logger.debug(e.getMessage());
- return null;
- }
- return translated;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/294fc707/core-metadata/src/main/java/org/apache/kylin/metadata/filter/CaseTupleFilter.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/CaseTupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/CaseTupleFilter.java
index 6dbc614..2b00d69 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/CaseTupleFilter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/CaseTupleFilter.java
@@ -18,6 +18,7 @@
package org.apache.kylin.metadata.filter;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -101,12 +102,12 @@ public class CaseTupleFilter extends TupleFilter {
}
@Override
- public byte[] serialize(IFilterCodeSystem<?> cs) {
- return new byte[0];
+ public void serialize(IFilterCodeSystem<?> cs, ByteBuffer buffer) {
+ //serialize nothing
}
@Override
- public void deserialize(byte[] bytes, IFilterCodeSystem<?> cs) {
+ public void deserialize(IFilterCodeSystem<?> cs, ByteBuffer buffer) {
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/294fc707/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ColumnTupleFilter.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ColumnTupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ColumnTupleFilter.java
index 0d2a73d..029233d 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ColumnTupleFilter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ColumnTupleFilter.java
@@ -84,8 +84,7 @@ public class ColumnTupleFilter extends TupleFilter {
}
@Override
- public byte[] serialize(IFilterCodeSystem<?> cs) {
- ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
+ public void serialize(IFilterCodeSystem<?> cs,ByteBuffer buffer) {
String table = columnRef.getTable();
BytesUtil.writeUTFString(table, buffer);
@@ -97,17 +96,13 @@ public class ColumnTupleFilter extends TupleFilter {
String dataType = columnRef.getDatatype();
BytesUtil.writeUTFString(dataType, buffer);
-
- byte[] result = new byte[buffer.position()];
- System.arraycopy(buffer.array(), 0, result, 0, buffer.position());
- return result;
}
@Override
- public void deserialize(byte[] bytes, IFilterCodeSystem<?> cs) {
+ public void deserialize(IFilterCodeSystem<?> cs, ByteBuffer buffer) {
+
TableDesc table = null;
ColumnDesc column = new ColumnDesc();
- ByteBuffer buffer = ByteBuffer.wrap(bytes);
String tableName = BytesUtil.readUTFString(buffer);
if (tableName != null) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/294fc707/core-metadata/src/main/java/org/apache/kylin/metadata/filter/CompareTupleFilter.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/CompareTupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/CompareTupleFilter.java
index 248ab3b..fc0bab7 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/CompareTupleFilter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/CompareTupleFilter.java
@@ -214,23 +214,19 @@ public class CompareTupleFilter extends TupleFilter {
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
- public byte[] serialize(IFilterCodeSystem cs) {
- ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
+ public void serialize(IFilterCodeSystem cs, ByteBuffer buffer) {
int size = this.dynamicVariables.size();
BytesUtil.writeVInt(size, buffer);
for (Map.Entry<String, Object> entry : this.dynamicVariables.entrySet()) {
BytesUtil.writeUTFString(entry.getKey(), buffer);
cs.serialize(entry.getValue(), buffer);
}
- byte[] result = new byte[buffer.position()];
- System.arraycopy(buffer.array(), 0, result, 0, buffer.position());
- return result;
}
@Override
- public void deserialize(byte[] bytes, IFilterCodeSystem<?> cs) {
+ public void deserialize(IFilterCodeSystem<?> cs, ByteBuffer buffer) {
+
this.dynamicVariables.clear();
- ByteBuffer buffer = ByteBuffer.wrap(bytes);
int size = BytesUtil.readVInt(buffer);
for (int i = 0; i < size; i++) {
String name = BytesUtil.readUTFString(buffer);
http://git-wip-us.apache.org/repos/asf/kylin/blob/294fc707/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ConstantTupleFilter.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ConstantTupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ConstantTupleFilter.java
index 3056a9c..db3eb4f 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ConstantTupleFilter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ConstantTupleFilter.java
@@ -21,8 +21,10 @@ package org.apache.kylin.metadata.filter;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashSet;
+import java.util.TreeSet;
+import com.google.common.collect.Lists;
+import org.apache.commons.collections.comparators.NullComparator;
import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.metadata.tuple.IEvaluatableTuple;
@@ -40,7 +42,7 @@ public class ConstantTupleFilter extends TupleFilter {
public ConstantTupleFilter() {
super(Collections.<TupleFilter> emptyList(), FilterOperatorEnum.CONSTANT);
- this.constantValues = new HashSet<Object>();
+ this.constantValues = Lists.newArrayList();
}
public ConstantTupleFilter(Object value) {
@@ -89,22 +91,18 @@ public class ConstantTupleFilter extends TupleFilter {
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
- public byte[] serialize(IFilterCodeSystem cs) {
- ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
+ public void serialize(IFilterCodeSystem cs, ByteBuffer buffer) {
int size = this.constantValues.size();
BytesUtil.writeVInt(size, buffer);
for (Object val : this.constantValues) {
cs.serialize(val, buffer);
}
- byte[] result = new byte[buffer.position()];
- System.arraycopy(buffer.array(), 0, result, 0, buffer.position());
- return result;
}
@Override
- public void deserialize(byte[] bytes, IFilterCodeSystem<?> cs) {
+ public void deserialize(IFilterCodeSystem<?> cs, ByteBuffer buffer) {
+
this.constantValues.clear();
- ByteBuffer buffer = ByteBuffer.wrap(bytes);
int size = BytesUtil.readVInt(buffer);
for (int i = 0; i < size; i++) {
this.constantValues.add(cs.deserialize(buffer));
http://git-wip-us.apache.org/repos/asf/kylin/blob/294fc707/core-metadata/src/main/java/org/apache/kylin/metadata/filter/DynamicTupleFilter.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/DynamicTupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/DynamicTupleFilter.java
index a482519..d9dc52a 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/DynamicTupleFilter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/DynamicTupleFilter.java
@@ -69,17 +69,12 @@ public class DynamicTupleFilter extends TupleFilter {
}
@Override
- public byte[] serialize(IFilterCodeSystem<?> cs) {
- ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
+ public void serialize(IFilterCodeSystem<?> cs, ByteBuffer buffer) {
BytesUtil.writeUTFString(variableName, buffer);
- byte[] result = new byte[buffer.position()];
- System.arraycopy(buffer.array(), 0, result, 0, buffer.position());
- return result;
}
@Override
- public void deserialize(byte[] bytes, IFilterCodeSystem<?> cs) {
- ByteBuffer buffer = ByteBuffer.wrap(bytes);
+ public void deserialize(IFilterCodeSystem<?> cs, ByteBuffer buffer) {
this.variableName = BytesUtil.readUTFString(buffer);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/294fc707/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ExtractTupleFilter.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ExtractTupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ExtractTupleFilter.java
index 6f7dfaf..591e64b 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ExtractTupleFilter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ExtractTupleFilter.java
@@ -18,6 +18,7 @@
package org.apache.kylin.metadata.filter;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@@ -113,12 +114,12 @@ public class ExtractTupleFilter extends TupleFilter {
}
@Override
- public byte[] serialize(IFilterCodeSystem<?> cs) {
- return new byte[0];
+ public void serialize(IFilterCodeSystem<?> cs, ByteBuffer buffer) {
+ //do nothing
}
@Override
- public void deserialize(byte[] bytes, IFilterCodeSystem<?> cs) {
+ public void deserialize(IFilterCodeSystem<?> cs, ByteBuffer buffer) {
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/294fc707/core-metadata/src/main/java/org/apache/kylin/metadata/filter/FunctionTupleFilter.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/FunctionTupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/FunctionTupleFilter.java
index 30bef97..2a08728 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/FunctionTupleFilter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/FunctionTupleFilter.java
@@ -123,32 +123,20 @@ public class FunctionTupleFilter extends TupleFilter {
}
@Override
- byte[] serialize(IFilterCodeSystem<?> cs) {
- ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
+ void serialize(IFilterCodeSystem<?> cs, ByteBuffer buffer) {
BytesUtil.writeUTFString(name, buffer);
BytesUtil.writeVInt(colPosition, buffer);
BytesUtil.writeVInt(isValid ? 1 : 0, buffer);
- BytesUtil.writeByteArray(TupleFilterSerializer.serialize(columnContainerFilter, cs), buffer);
-
- byte[] result = new byte[buffer.position()];
- System.arraycopy(buffer.array(), 0, result, 0, buffer.position());
- return result;
}
@Override
- void deserialize(byte[] bytes, IFilterCodeSystem<?> cs) {
- ByteBuffer buffer = ByteBuffer.wrap(bytes);
+ public void deserialize(IFilterCodeSystem<?> cs, ByteBuffer buffer) {
this.name = BytesUtil.readUTFString(buffer);
this.initMethod();
this.colPosition = BytesUtil.readVInt(buffer);
this.isValid = BytesUtil.readVInt(buffer) == 1;
-
- byte[] columnFilter = BytesUtil.readByteArray(buffer);
- if (columnFilter != null) {
- this.columnContainerFilter = TupleFilterSerializer.deserialize(columnFilter, cs);
- }
}
@Override
http://git-wip-us.apache.org/repos/asf/kylin/blob/294fc707/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ITupleFilterTransformer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ITupleFilterTransformer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ITupleFilterTransformer.java
new file mode 100644
index 0000000..d3d5076
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ITupleFilterTransformer.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.metadata.filter;
+
+public interface ITupleFilterTransformer {
+ TupleFilter transform(TupleFilter tupleFilter);
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/294fc707/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ITupleFilterTranslator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ITupleFilterTranslator.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ITupleFilterTranslator.java
deleted file mode 100644
index aed284c..0000000
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ITupleFilterTranslator.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.metadata.filter;
-
-/**
- * Created by dongli on 1/7/16.
- */
-public interface ITupleFilterTranslator {
- TupleFilter translate(TupleFilter tupleFilter);
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/294fc707/core-metadata/src/main/java/org/apache/kylin/metadata/filter/LogicalTupleFilter.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/LogicalTupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/LogicalTupleFilter.java
index 0929cf1..61657fb 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/LogicalTupleFilter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/LogicalTupleFilter.java
@@ -18,6 +18,7 @@
package org.apache.kylin.metadata.filter;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -46,7 +47,6 @@ public class LogicalTupleFilter extends TupleFilter {
return cloneTuple;
}
-
// private TupleFilter reverseNestedNots(TupleFilter filter, int depth) {
// if ((filter instanceof LogicalTupleFilter) && (filter.operator == FilterOperatorEnum.NOT)) {
// assert (filter.children.size() == 1);
@@ -60,7 +60,6 @@ public class LogicalTupleFilter extends TupleFilter {
// }
// }
-
@Override
public TupleFilter reverse() {
switch (operator) {
@@ -151,12 +150,13 @@ public class LogicalTupleFilter extends TupleFilter {
}
@Override
- public byte[] serialize(IFilterCodeSystem<?> cs) {
- return new byte[0];
+ public void serialize(IFilterCodeSystem<?> cs, ByteBuffer buffer) {
+ //do nothing
}
@Override
- public void deserialize(byte[] bytes, IFilterCodeSystem<?> cs) {
+ public void deserialize(IFilterCodeSystem<?> cs, ByteBuffer buffer) {
+
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/294fc707/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java
index 944ddd0..1e23499 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java
@@ -18,6 +18,7 @@
package org.apache.kylin.metadata.filter;
+import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
@@ -204,9 +205,9 @@ public abstract class TupleFilter {
public abstract Collection<?> getValues();
- abstract byte[] serialize(IFilterCodeSystem<?> cs);
+ abstract void serialize(IFilterCodeSystem<?> cs,ByteBuffer buffer);
- abstract void deserialize(byte[] bytes, IFilterCodeSystem<?> cs);
+ abstract void deserialize(IFilterCodeSystem<?> cs,ByteBuffer buffer);
public static boolean isEvaluableRecursively(TupleFilter filter) {
if (filter == null)
http://git-wip-us.apache.org/repos/asf/kylin/blob/294fc707/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilterSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilterSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilterSerializer.java
index a394a51..39ccb15 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilterSerializer.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilterSerializer.java
@@ -18,12 +18,15 @@
package org.apache.kylin.metadata.filter;
+import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.Stack;
import org.apache.kylin.common.util.BytesUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* http://eli.thegreenplace.net/2011/09/29/an-interesting-tree-serialization-algorithm-from-dwarf
@@ -33,6 +36,8 @@ import org.apache.kylin.common.util.BytesUtil;
*/
public class TupleFilterSerializer {
+ private static final Logger logger = LoggerFactory.getLogger(TupleFilterSerializer.class);
+
public interface Decorator {
TupleFilter onSerialize(TupleFilter filter);
}
@@ -51,8 +56,18 @@ public class TupleFilterSerializer {
}
public static byte[] serialize(TupleFilter rootFilter, Decorator decorator, IFilterCodeSystem<?> cs) {
- ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
- internalSerialize(rootFilter, decorator, buffer, cs);
+ ByteBuffer buffer;
+ int bufferSize = BUFFER_SIZE;
+ while (true) {
+ try {
+ buffer = ByteBuffer.allocate(bufferSize);
+ internalSerialize(rootFilter, decorator, buffer, cs);
+ break;
+ } catch (BufferOverflowException e) {
+ logger.info("Buffer size {} cannot hold the filter, resizing to 4 times", bufferSize);
+ bufferSize *= 4;
+ }
+ }
byte[] result = new byte[buffer.position()];
System.arraycopy(buffer.array(), 0, result, 0, buffer.position());
return result;
@@ -86,10 +101,9 @@ public class TupleFilterSerializer {
if (flag < 0) {
BytesUtil.writeVInt(-1, buffer);
} else {
- byte[] bytes = filter.serialize(cs);
int opVal = filter.getOperator().getValue();
BytesUtil.writeVInt(opVal, buffer);
- BytesUtil.writeByteArray(bytes, buffer);
+ filter.serialize(cs, buffer);
BytesUtil.writeVInt(flag, buffer);
}
}
@@ -107,8 +121,7 @@ public class TupleFilterSerializer {
// deserialize filter
TupleFilter filter = createTupleFilter(opVal);
- byte[] filterBytes = BytesUtil.readByteArray(buffer);
- filter.deserialize(filterBytes, cs);
+ filter.deserialize(cs, buffer);
if (rootFilter == null) {
// push root to stack
http://git-wip-us.apache.org/repos/asf/kylin/blob/294fc707/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/FilterDecorator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/FilterDecorator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/FilterDecorator.java
index 01d3041..5208ba7 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/FilterDecorator.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/FilterDecorator.java
@@ -8,11 +8,11 @@ import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.cube.kv.RowKeyColumnIO;
import org.apache.kylin.dict.DictCodeSystem;
import org.apache.kylin.dict.IDictionaryAware;
-import org.apache.kylin.dict.TupleFilterFunctionTranslator;
+import org.apache.kylin.dict.TupleFilterFunctionTransformer;
import org.apache.kylin.metadata.filter.ColumnTupleFilter;
import org.apache.kylin.metadata.filter.CompareTupleFilter;
import org.apache.kylin.metadata.filter.ConstantTupleFilter;
-import org.apache.kylin.metadata.filter.ITupleFilterTranslator;
+import org.apache.kylin.metadata.filter.ITupleFilterTransformer;
import org.apache.kylin.metadata.filter.TupleFilter;
import org.apache.kylin.metadata.filter.TupleFilterSerializer;
import org.apache.kylin.metadata.model.TblColRef;
@@ -131,8 +131,8 @@ public class FilterDecorator implements TupleFilterSerializer.Decorator {
if (filter == null)
return null;
- ITupleFilterTranslator translator = new TupleFilterFunctionTranslator(columnIO.getIDictionaryAware());
- filter = translator.translate(filter);
+ ITupleFilterTransformer translator = new TupleFilterFunctionTransformer(columnIO.getIDictionaryAware());
+ filter = translator.transform(filter);
// un-evaluatable filter is replaced with TRUE
if (!filter.isEvaluable()) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/294fc707/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java
index e96c602..3f00566 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java
@@ -23,7 +23,7 @@ import org.apache.kylin.cube.gridtable.CubeGridTable;
import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
import org.apache.kylin.cube.gridtable.NotEnoughGTInfoException;
import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.dict.TupleFilterFunctionTranslator;
+import org.apache.kylin.dict.TupleFilterFunctionTransformer;
import org.apache.kylin.gridtable.EmptyGTScanner;
import org.apache.kylin.gridtable.GTInfo;
import org.apache.kylin.gridtable.GTRecord;
@@ -33,7 +33,7 @@ import org.apache.kylin.gridtable.GTScanRequest;
import org.apache.kylin.gridtable.GTUtil;
import org.apache.kylin.gridtable.IGTScanner;
import org.apache.kylin.metadata.datatype.DataType;
-import org.apache.kylin.metadata.filter.ITupleFilterTranslator;
+import org.apache.kylin.metadata.filter.ITupleFilterTransformer;
import org.apache.kylin.metadata.filter.TupleFilter;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.TblColRef;
@@ -63,9 +63,9 @@ public class CubeSegmentScanner implements IGTScanner {
CuboidToGridTableMapping mapping = cuboid.getCuboidToGridTableMapping();
- // translate FunctionTupleFilter to IN clause
- ITupleFilterTranslator translator = new TupleFilterFunctionTranslator(this.cubeSeg);
- filter = translator.translate(filter);
+ // transform FunctionTupleFilter to equivalent IN clause
+ ITupleFilterTransformer translator = new TupleFilterFunctionTransformer(this.cubeSeg);
+ filter = translator.transform(filter);
//replace the constant values in filter to dictionary codes
TupleFilter gtFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, mapping.getCuboidDimensionsInGTOrder(), groups);
http://git-wip-us.apache.org/repos/asf/kylin/blob/294fc707/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/coprocessor/FilterBaseTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/coprocessor/FilterBaseTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/coprocessor/FilterBaseTest.java
index 92e9699..e7ed1a8 100644
--- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/coprocessor/FilterBaseTest.java
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/coprocessor/FilterBaseTest.java
@@ -18,8 +18,12 @@
package org.apache.kylin.storage.hbase.common.coprocessor;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
import java.util.ArrayList;
+import java.util.Calendar;
import java.util.Collection;
+import java.util.Date;
import java.util.List;
import java.util.Random;
@@ -39,6 +43,8 @@ import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.tuple.Tuple;
import org.apache.kylin.metadata.tuple.TupleInfo;
+import com.google.common.collect.Lists;
+
/**
* @author xjiang
*
@@ -64,7 +70,7 @@ public class FilterBaseTest {
return groups;
}
- protected CompareTupleFilter buildCompareFilter(List<TblColRef> groups, int index) {
+ protected CompareTupleFilter buildEQCompareFilter(List<TblColRef> groups, int index) {
TblColRef column = groups.get(index);
CompareTupleFilter compareFilter = new CompareTupleFilter(FilterOperatorEnum.EQ);
ColumnTupleFilter columnFilter = new ColumnTupleFilter(column);
@@ -79,9 +85,31 @@ public class FilterBaseTest {
return compareFilter;
}
+ protected CompareTupleFilter buildINCompareFilter(TblColRef dateColumn) throws ParseException {
+ CompareTupleFilter compareFilter = new CompareTupleFilter(FilterOperatorEnum.IN);
+ ColumnTupleFilter columnFilter = new ColumnTupleFilter(dateColumn);
+ compareFilter.addChild(columnFilter);
+
+ List<String> inValues = Lists.newArrayList();
+ SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
+ Date startDate = simpleDateFormat.parse("1970-01-01");
+ Date endDate = simpleDateFormat.parse("2100-01-01");
+ Calendar start = Calendar.getInstance();
+ start.setTime(startDate);
+ Calendar end = Calendar.getInstance();
+ end.setTime(endDate);
+ for (Date date = start.getTime(); start.before(end); start.add(Calendar.DATE, 1), date = start.getTime()) {
+ inValues.add(simpleDateFormat.format(date));
+ }
+
+ ConstantTupleFilter constantFilter = new ConstantTupleFilter(inValues);
+ compareFilter.addChild(constantFilter);
+ return compareFilter;
+ }
+
protected TupleFilter buildAndFilter(List<TblColRef> columns) {
- CompareTupleFilter compareFilter1 = buildCompareFilter(columns, 0);
- CompareTupleFilter compareFilter2 = buildCompareFilter(columns, 1);
+ CompareTupleFilter compareFilter1 = buildEQCompareFilter(columns, 0);
+ CompareTupleFilter compareFilter2 = buildEQCompareFilter(columns, 1);
LogicalTupleFilter andFilter = new LogicalTupleFilter(FilterOperatorEnum.AND);
andFilter.addChild(compareFilter1);
andFilter.addChild(compareFilter2);
@@ -89,8 +117,8 @@ public class FilterBaseTest {
}
protected TupleFilter buildOrFilter(List<TblColRef> columns) {
- CompareTupleFilter compareFilter1 = buildCompareFilter(columns, 0);
- CompareTupleFilter compareFilter2 = buildCompareFilter(columns, 1);
+ CompareTupleFilter compareFilter1 = buildEQCompareFilter(columns, 0);
+ CompareTupleFilter compareFilter2 = buildEQCompareFilter(columns, 1);
LogicalTupleFilter logicFilter = new LogicalTupleFilter(FilterOperatorEnum.OR);
logicFilter.addChild(compareFilter1);
logicFilter.addChild(compareFilter2);
@@ -105,12 +133,12 @@ public class FilterBaseTest {
TupleFilter then0 = new ConstantTupleFilter("0");
caseFilter.addChild(then0);
- TupleFilter when1 = buildCompareFilter(groups, 0);
+ TupleFilter when1 = buildEQCompareFilter(groups, 0);
caseFilter.addChild(when1);
TupleFilter then1 = new ConstantTupleFilter("1");
caseFilter.addChild(then1);
- TupleFilter when2 = buildCompareFilter(groups, 1);
+ TupleFilter when2 = buildEQCompareFilter(groups, 1);
caseFilter.addChild(when2);
TupleFilter then2 = new ConstantTupleFilter("2");
caseFilter.addChild(then2);
@@ -153,9 +181,9 @@ public class FilterBaseTest {
}
String str1 = f1.toString();
- System.out.println("f1=" + str1);
+ //System.out.println("f1=" + str1);
String str2 = f2.toString();
- System.out.println("f2=" + str2);
+ //System.out.println("f2=" + str2);
if (!str1.equals(str2)) {
throw new IllegalStateException("f1=" + str1 + ", f2=" + str2);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/294fc707/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/coprocessor/FilterEvaluateTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/coprocessor/FilterEvaluateTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/coprocessor/FilterEvaluateTest.java
index bde8dd2..aac09b7 100644
--- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/coprocessor/FilterEvaluateTest.java
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/coprocessor/FilterEvaluateTest.java
@@ -38,7 +38,7 @@ public class FilterEvaluateTest extends FilterBaseTest {
@Test
public void testEvaluate00() {
List<TblColRef> groups = buildGroups();
- TupleFilter filter = buildCompareFilter(groups, 0);
+ TupleFilter filter = buildEQCompareFilter(groups, 0);
byte[] bytes = TupleFilterSerializer.serialize(filter, CS);
TupleFilter newFilter = TupleFilterSerializer.deserialize(bytes, CS);
@@ -54,7 +54,7 @@ public class FilterEvaluateTest extends FilterBaseTest {
@Test
public void testEvaluate01() {
List<TblColRef> groups = buildGroups();
- TupleFilter filter = buildCompareFilter(groups, 1);
+ TupleFilter filter = buildEQCompareFilter(groups, 1);
byte[] bytes = TupleFilterSerializer.serialize(filter, CS);
TupleFilter newFilter = TupleFilterSerializer.deserialize(bytes, CS);
http://git-wip-us.apache.org/repos/asf/kylin/blob/294fc707/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/coprocessor/FilterSerializeTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/coprocessor/FilterSerializeTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/coprocessor/FilterSerializeTest.java
index 8bf8ecb..0a21598 100644
--- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/coprocessor/FilterSerializeTest.java
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/coprocessor/FilterSerializeTest.java
@@ -18,6 +18,7 @@
package org.apache.kylin.storage.hbase.common.coprocessor;
+import java.text.ParseException;
import java.util.ArrayList;
import java.util.List;
@@ -40,7 +41,7 @@ public class FilterSerializeTest extends FilterBaseTest {
@Test
public void testSerialize01() {
List<TblColRef> groups = buildGroups();
- TupleFilter filter = buildCompareFilter(groups, 0);
+ TupleFilter filter = buildEQCompareFilter(groups, 0);
byte[] bytes = TupleFilterSerializer.serialize(filter, CS);
TupleFilter newFilter = TupleFilterSerializer.deserialize(bytes, CS);
@@ -51,7 +52,7 @@ public class FilterSerializeTest extends FilterBaseTest {
@Test
public void testSerialize02() {
List<TblColRef> groups = buildGroups();
- TupleFilter filter = buildCompareFilter(groups, 1);
+ TupleFilter filter = buildEQCompareFilter(groups, 1);
byte[] bytes = TupleFilterSerializer.serialize(filter, CS);
TupleFilter newFilter = TupleFilterSerializer.deserialize(bytes, CS);
@@ -88,7 +89,7 @@ public class FilterSerializeTest extends FilterBaseTest {
TblColRef colRef = new TblColRef(column);
List<TblColRef> groups = new ArrayList<TblColRef>();
groups.add(colRef);
- TupleFilter filter = buildCompareFilter(groups, 0);
+ TupleFilter filter = buildEQCompareFilter(groups, 0);
byte[] bytes = TupleFilterSerializer.serialize(filter, CS);
TupleFilter newFilter = TupleFilterSerializer.deserialize(bytes, CS);
@@ -103,7 +104,7 @@ public class FilterSerializeTest extends FilterBaseTest {
TblColRef colRef = new TblColRef(column);
List<TblColRef> groups = new ArrayList<TblColRef>();
groups.add(colRef);
- TupleFilter filter = buildCompareFilter(groups, 0);
+ TupleFilter filter = buildEQCompareFilter(groups, 0);
byte[] bytes = TupleFilterSerializer.serialize(filter, CS);
TupleFilter newFilter = TupleFilterSerializer.deserialize(bytes, CS);
@@ -122,7 +123,7 @@ public class FilterSerializeTest extends FilterBaseTest {
TblColRef colRef = new TblColRef(column);
List<TblColRef> groups = new ArrayList<TblColRef>();
groups.add(colRef);
- TupleFilter filter = buildCompareFilter(groups, 0);
+ TupleFilter filter = buildEQCompareFilter(groups, 0);
byte[] bytes = TupleFilterSerializer.serialize(filter, CS);
TupleFilter newFilter = TupleFilterSerializer.deserialize(bytes, CS);
@@ -140,7 +141,7 @@ public class FilterSerializeTest extends FilterBaseTest {
TblColRef colRef = new TblColRef(column);
List<TblColRef> groups = new ArrayList<TblColRef>();
groups.add(colRef);
- TupleFilter filter = buildCompareFilter(groups, 0);
+ TupleFilter filter = buildEQCompareFilter(groups, 0);
byte[] bytes = TupleFilterSerializer.serialize(filter, CS);
TupleFilter newFilter = TupleFilterSerializer.deserialize(bytes, CS);
@@ -203,6 +204,19 @@ public class FilterSerializeTest extends FilterBaseTest {
}
@Test
+ public void testSerialize14() throws ParseException {
+ List<TblColRef> groups = buildGroups();
+ TupleFilter filter = buildINCompareFilter(groups.get(0));
+
+ long start = System.currentTimeMillis();
+ byte[] bytes = TupleFilterSerializer.serialize(filter, CS);
+ System.out.println("Size of serialized filter " + bytes.length + ", serialize time: " + (System.currentTimeMillis() - start));
+ TupleFilter newFilter = TupleFilterSerializer.deserialize(bytes, CS);
+
+ compareFilter(filter, newFilter);
+ }
+
+ @Test
public void testDynamic() {
final CompareTupleFilter compareDynamicFilter = buildCompareDynamicFilter(buildGroups());
[2/4] kylin git commit: KYLIN-1366 simply metadata version binding
Posted by ma...@apache.org.
KYLIN-1366 simply metadata version binding
kylin 1366
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/ab9d5791
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/ab9d5791
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/ab9d5791
Branch: refs/heads/2.x-staging
Commit: ab9d57914a119b5b426d2bfae712ca55ff35c92a
Parents: 4c08ded
Author: honma <ho...@ebay.com>
Authored: Wed Feb 24 15:58:07 2016 +0800
Committer: honma <ho...@ebay.com>
Committed: Fri Feb 26 17:54:37 2016 +0800
----------------------------------------------------------------------
.../org/apache/kylin/common/KylinConfigBase.java | 8 --------
.../java/org/apache/kylin/common/KylinVersion.java | 2 +-
.../common/persistence/RootPersistentEntity.java | 17 +++++------------
.../java/org/apache/kylin/cube/CubeInstance.java | 5 ++---
.../java/org/apache/kylin/cube/model/CubeDesc.java | 2 +-
.../java/org/apache/kylin/dict/DictionaryInfo.java | 4 ++--
.../apache/kylin/dict/lookup/SnapshotManager.java | 2 +-
.../apache/kylin/metadata/model/DataModelDesc.java | 2 +-
.../kylin/metadata/project/ProjectInstance.java | 2 +-
.../kylin/metadata/project/ProjectManager.java | 2 +-
.../kylin/storage/hybrid/HybridInstance.java | 2 +-
.../org/apache/kylin/invertedindex/IIInstance.java | 2 +-
.../kylin/rest/controller/CubeController.java | 4 ++--
.../storage/hbase/util/ExtendCubeToHybridCLI.java | 4 ++--
14 files changed, 21 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/ab9d5791/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 5f9983a..7707684 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -61,14 +61,6 @@ public class KylinConfigBase implements Serializable {
return kylinHome;
}
- /**
- * @see KylinVersion
- * @return current kylin version
- */
- public static String getKylinVersion(){
- return KylinVersion.getCurrentVersion();
- }
-
// ============================================================================
private volatile Properties properties = new Properties();
http://git-wip-us.apache.org/repos/asf/kylin/blob/ab9d5791/core-common/src/main/java/org/apache/kylin/common/KylinVersion.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinVersion.java b/core-common/src/main/java/org/apache/kylin/common/KylinVersion.java
index d711b38..42cf237 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinVersion.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinVersion.java
@@ -21,7 +21,7 @@ package org.apache.kylin.common;
*
* @since 2.1
*/
-class KylinVersion {
+public class KylinVersion {
/**
* Require MANUAL updating kylin version per ANY upgrading.
*/
http://git-wip-us.apache.org/repos/asf/kylin/blob/ab9d5791/core-common/src/main/java/org/apache/kylin/common/persistence/RootPersistentEntity.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/RootPersistentEntity.java b/core-common/src/main/java/org/apache/kylin/common/persistence/RootPersistentEntity.java
index 327ddcc..c46abe7 100644
--- a/core-common/src/main/java/org/apache/kylin/common/persistence/RootPersistentEntity.java
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/RootPersistentEntity.java
@@ -26,11 +26,11 @@ import java.util.Date;
import java.util.UUID;
import org.apache.commons.lang.time.FastDateFormat;
+import org.apache.kylin.common.KylinVersion;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.kylin.common.KylinConfig;
/**
* Marks the root entity of JSON persistence. Unit of read, write, cache, and
@@ -80,14 +80,14 @@ abstract public class RootPersistentEntity implements AclEntity, Serializable {
* For example: 2.1
*/
@JsonProperty("version")
- protected String version;
+ protected String version = KylinVersion.getCurrentVersion();
public String getVersion() {
- return version;
+ return version;
}
public void setVersion(String version) {
- this.version = version;
+ this.version = version;
}
public String getUuid() {
@@ -110,14 +110,7 @@ abstract public class RootPersistentEntity implements AclEntity, Serializable {
this.lastModified = lastModified;
}
- /**
- * Update entity's "model_version" with current kylin version and "uuid" with random UUID
- *
- * @see KylinConfig#getKylinVersion()
- * @see UUID#randomUUID()
- */
- public void updateVersionAndRandomUuid() {
- setVersion(KylinConfig.getKylinVersion());
+ public void updateRandomUuid() {
setUuid(UUID.randomUUID().toString());
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/ab9d5791/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
index c26e2d2..2862d4f 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
@@ -63,7 +63,7 @@ public class CubeInstance extends RootPersistentEntity implements IRealization,
cubeInstance.setCreateTimeUTC(System.currentTimeMillis());
cubeInstance.setSegments(new ArrayList<CubeSegment>());
cubeInstance.setStatus(RealizationStatusEnum.DISABLED);
- cubeInstance.updateVersionAndRandomUuid();
+ cubeInstance.updateRandomUuid();
return cubeInstance;
}
@@ -427,10 +427,9 @@ public class CubeInstance extends RootPersistentEntity implements IRealization,
newCube.setConfig(cubeInstance.getConfig());
newCube.setStatus(cubeInstance.getStatus());
newCube.setOwner(cubeInstance.getOwner());
- newCube.setVersion(cubeInstance.getVersion());
newCube.setCost(cubeInstance.getCost());
newCube.setCreateTimeUTC(System.currentTimeMillis());
- newCube.updateVersionAndRandomUuid();
+ newCube.updateRandomUuid();
return newCube;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/ab9d5791/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
index de73399..49f70f8 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
@@ -998,7 +998,7 @@ public class CubeDesc extends RootPersistentEntity {
newCubeDesc.setStorageType(cubeDesc.getStorageType());
newCubeDesc.setAggregationGroups(cubeDesc.getAggregationGroups());
newCubeDesc.setConfig(cubeDesc.getConfig());
- newCubeDesc.updateVersionAndRandomUuid();
+ newCubeDesc.updateRandomUuid();
return newCubeDesc;
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/ab9d5791/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java
index 8e41abf..4fba59a 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java
@@ -53,7 +53,7 @@ public class DictionaryInfo extends RootPersistentEntity {
public DictionaryInfo(String sourceTable, String sourceColumn, int sourceColumnIndex, String dataType, TableSignature input) {
- this.updateVersionAndRandomUuid();
+ this.updateRandomUuid();
this.sourceTable = sourceTable;
this.sourceColumn = sourceColumn;
@@ -64,7 +64,7 @@ public class DictionaryInfo extends RootPersistentEntity {
public DictionaryInfo(DictionaryInfo other) {
- this.updateVersionAndRandomUuid();
+ this.updateRandomUuid();
this.sourceTable = other.sourceTable;
this.sourceColumn = other.sourceColumn;
http://git-wip-us.apache.org/repos/asf/kylin/blob/ab9d5791/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
index ccdc79d..53bf60d 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
@@ -87,7 +87,7 @@ public class SnapshotManager {
public SnapshotTable buildSnapshot(ReadableTable table, TableDesc tableDesc) throws IOException {
SnapshotTable snapshot = new SnapshotTable(table);
- snapshot.updateVersionAndRandomUuid();
+ snapshot.updateRandomUuid();
String dup = checkDupByInfo(snapshot);
if (dup != null) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/ab9d5791/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java
index c042138..1647707 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java
@@ -348,7 +348,7 @@ public class DataModelDesc extends RootPersistentEntity {
newDataModelDesc.setLookups(dataModelDesc.getLookups());
newDataModelDesc.setMetrics(dataModelDesc.getMetrics());
newDataModelDesc.setPartitionDesc(PartitionDesc.getCopyOf(dataModelDesc.getPartitionDesc()));
- newDataModelDesc.updateVersionAndRandomUuid();
+ newDataModelDesc.updateRandomUuid();
return newDataModelDesc;
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/ab9d5791/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java
index 20741ee..e0ed3d9 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java
@@ -92,7 +92,7 @@ public class ProjectInstance extends RootPersistentEntity {
public static ProjectInstance create(String name, String owner, String description, List<RealizationEntry> realizationEntries, List<String> models) {
ProjectInstance projectInstance = new ProjectInstance();
- projectInstance.updateVersionAndRandomUuid();
+ projectInstance.updateRandomUuid();
projectInstance.setName(name);
projectInstance.setOwner(owner);
projectInstance.setDescription(description);
http://git-wip-us.apache.org/repos/asf/kylin/blob/ab9d5791/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
index f73239c..b6e99b3 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
@@ -197,7 +197,7 @@ public class ProjectManager {
project.setDescription(newDesc);
if (project.getUuid() == null)
- project.updateVersionAndRandomUuid();
+ project.updateRandomUuid();
updateProject(project);
http://git-wip-us.apache.org/repos/asf/kylin/blob/ab9d5791/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
index 090efce..251f7c9 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
@@ -85,7 +85,7 @@ public class HybridInstance extends RootPersistentEntity implements IRealization
hybridInstance.setConfig(config);
hybridInstance.setName(name);
hybridInstance.setRealizationEntries(realizationEntries);
- hybridInstance.updateVersionAndRandomUuid();
+ hybridInstance.updateRandomUuid();
return hybridInstance;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/ab9d5791/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIInstance.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIInstance.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIInstance.java
index 117bc02..9b56c88 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIInstance.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIInstance.java
@@ -60,7 +60,7 @@ public class IIInstance extends RootPersistentEntity implements IRealization, IB
iii.setDescName(iiDesc.getName());
iii.setCreateTimeUTC(System.currentTimeMillis());
iii.setStatus(RealizationStatusEnum.DISABLED);
- iii.updateVersionAndRandomUuid();
+ iii.updateRandomUuid();
return iii;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/ab9d5791/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java
index 4741cef..9afa750 100644
--- a/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java
+++ b/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java
@@ -308,7 +308,7 @@ public class CubeController extends BasicController {
isStreamingCube = true;
newStreamingConfig = streamingConfigs.get(0).clone();
newStreamingConfig.setName(newCubeName + "_STREAMING");
- newStreamingConfig.updateVersionAndRandomUuid();
+ newStreamingConfig.updateRandomUuid();
newStreamingConfig.setLastModified(0);
newStreamingConfig.setCubeName(newCubeName);
try {
@@ -327,7 +327,7 @@ public class CubeController extends BasicController {
newKafkaConfig = kafkaConfig.clone();
newKafkaConfig.setName(newStreamingConfig.getName());
newKafkaConfig.setLastModified(0);
- newKafkaConfig.updateVersionAndRandomUuid();
+ newKafkaConfig.updateRandomUuid();
}
} catch (IOException e) {
throw new InternalErrorException("Failed to get kafka config info. ", e);
http://git-wip-us.apache.org/repos/asf/kylin/blob/ab9d5791/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java
index 1f46369..c55bde4 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java
@@ -147,7 +147,7 @@ public class ExtendCubeToHybridCLI {
CubeInstance newCubeInstance = CubeInstance.getCopyOf(cubeInstance);
newCubeInstance.setName(newCubeInstanceName);
newCubeInstance.setDescName(newCubeDescName);
- newCubeInstance.updateVersionAndRandomUuid();
+ newCubeInstance.updateRandomUuid();
Iterator<CubeSegment> segmentIterator = newCubeInstance.getSegments().iterator();
CubeSegment currentSeg = null;
while (segmentIterator.hasNext()) {
@@ -170,7 +170,7 @@ public class ExtendCubeToHybridCLI {
// create new cube for old segments
CubeDesc newCubeDesc = CubeDesc.getCopyOf(cubeDesc);
newCubeDesc.setName(newCubeDescName);
- newCubeDesc.updateVersionAndRandomUuid();
+ newCubeDesc.updateRandomUuid();
newCubeDesc.init(kylinConfig, metadataManager.getAllTablesMap());
newCubeDesc.setPartitionDateEnd(partitionDate);
newCubeDesc.calculateSignature();
[4/4] kylin git commit: KYLIN-1340 CubeMetaExtractor support
streaming case and skip segments
Posted by ma...@apache.org.
KYLIN-1340 CubeMetaExtractor support streaming case and skip segments
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/4c08ded6
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/4c08ded6
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/4c08ded6
Branch: refs/heads/2.x-staging
Commit: 4c08ded63f78aad93eefa9814d48af2486725967
Parents: 2e1d2f6
Author: honma <ho...@ebay.com>
Authored: Wed Feb 24 15:45:38 2016 +0800
Committer: honma <ho...@ebay.com>
Committed: Fri Feb 26 17:54:37 2016 +0800
----------------------------------------------------------------------
.../org/apache/kylin/job/CubeMetaExtractor.java | 327 +++++++++++++++++++
.../kylin/common/persistence/ResourceTool.java | 2 +-
.../engine/streaming/StreamingManager.java | 100 +-----
.../kylin/source/kafka/KafkaConfigManager.java | 47 +--
.../kylin/source/kafka/config/KafkaConfig.java | 4 +-
.../storage/hbase/util/CubeMetaExtractor.java | 284 ----------------
6 files changed, 345 insertions(+), 419 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/4c08ded6/assembly/src/test/java/org/apache/kylin/job/CubeMetaExtractor.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/CubeMetaExtractor.java b/assembly/src/test/java/org/apache/kylin/job/CubeMetaExtractor.java
new file mode 100644
index 0000000..527ef0a
--- /dev/null
+++ b/assembly/src/test/java/org/apache/kylin/job/CubeMetaExtractor.java
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.job;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.OptionGroup;
+import org.apache.commons.cli.Options;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.ResourceTool;
+import org.apache.kylin.common.util.AbstractApplication;
+import org.apache.kylin.common.util.OptionsHelper;
+import org.apache.kylin.cube.CubeDescManager;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.streaming.StreamingConfig;
+import org.apache.kylin.engine.streaming.StreamingManager;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.job.dao.ExecutableDao;
+import org.apache.kylin.job.dao.ExecutablePO;
+import org.apache.kylin.job.exception.PersistentException;
+import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.project.ProjectInstance;
+import org.apache.kylin.metadata.project.ProjectManager;
+import org.apache.kylin.metadata.project.RealizationEntry;
+import org.apache.kylin.metadata.realization.IRealization;
+import org.apache.kylin.metadata.realization.RealizationRegistry;
+import org.apache.kylin.metadata.realization.RealizationType;
+import org.apache.kylin.source.kafka.KafkaConfigManager;
+import org.apache.kylin.source.kafka.config.KafkaConfig;
+import org.apache.kylin.storage.hybrid.HybridInstance;
+import org.apache.kylin.storage.hybrid.HybridManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+/**
+ * extract cube related info for debugging/distributing purpose
+ * TODO: deal with II case
+ */
+public class CubeMetaExtractor extends AbstractApplication {
+
+ private static final Logger logger = LoggerFactory.getLogger(CubeMetaExtractor.class);
+
+ @SuppressWarnings("static-access")
+ private static final Option OPTION_CUBE = OptionBuilder.withArgName("cube").hasArg().isRequired(false).withDescription("Specify which cube to extract").create("cube");
+ @SuppressWarnings("static-access")
+ private static final Option OPTION_HYBRID = OptionBuilder.withArgName("hybrid").hasArg().isRequired(false).withDescription("Specify which hybrid to extract").create("hybrid");
+ @SuppressWarnings("static-access")
+ private static final Option OPTION_PROJECT = OptionBuilder.withArgName("project").hasArg().isRequired(false).withDescription("Specify realizations in which project to extract").create("project");
+
+ @SuppressWarnings("static-access")
+ private static final Option OPTION_INCLUDE_SEGMENTS = OptionBuilder.withArgName("includeSegments").hasArg().isRequired(false).withDescription("set this to true if want extract the segments info, related dicts, etc. Default true").create("includeSegments");
+ @SuppressWarnings("static-access")
+ private static final Option OPTION_INCLUDE_JOB = OptionBuilder.withArgName("includeJobs").hasArg().isRequired(false).withDescription("set this to true if want to extract job info/outputs too. Default true").create("includeJobs");
+
+ @SuppressWarnings("static-access")
+ private static final Option OPTION_DEST = OptionBuilder.withArgName("destDir").hasArg().isRequired(false).withDescription("specify the dest dir to save the related metadata").create("destDir");
+
+ private Options options = null;
+ private KylinConfig kylinConfig;
+ private MetadataManager metadataManager;
+ private ProjectManager projectManager;
+ private HybridManager hybridManager;
+ private CubeManager cubeManager;
+ private StreamingManager streamingManager;
+ private KafkaConfigManager kafkaConfigManager;
+ private CubeDescManager cubeDescManager;
+ private ExecutableDao executableDao;
+ private RealizationRegistry realizationRegistry;
+
+ boolean includeSegments;
+ boolean includeJobs;
+
+ List<String> requiredResources = Lists.newArrayList();
+ List<String> optionalResources = Lists.newArrayList();
+ List<CubeInstance> cubesToTrimAndSave = Lists.newArrayList();//these cubes needs to be saved skipping segments
+
+ public CubeMetaExtractor() {
+ options = new Options();
+
+ OptionGroup realizationOrProject = new OptionGroup();
+ realizationOrProject.addOption(OPTION_CUBE);
+ realizationOrProject.addOption(OPTION_PROJECT);
+ realizationOrProject.addOption(OPTION_HYBRID);
+ realizationOrProject.setRequired(true);
+
+ options.addOptionGroup(realizationOrProject);
+ options.addOption(OPTION_INCLUDE_SEGMENTS);
+ options.addOption(OPTION_INCLUDE_JOB);
+ options.addOption(OPTION_DEST);
+
+ }
+
+ @Override
+ protected Options getOptions() {
+ return options;
+ }
+
+ @Override
+ protected void execute(OptionsHelper optionsHelper) throws Exception {
+ includeSegments = optionsHelper.hasOption(OPTION_INCLUDE_SEGMENTS) ? Boolean.valueOf(optionsHelper.getOptionValue(OPTION_INCLUDE_SEGMENTS)) : true;
+ includeJobs = optionsHelper.hasOption(OPTION_INCLUDE_JOB) ? Boolean.valueOf(optionsHelper.getOptionValue(OPTION_INCLUDE_JOB)) : true;
+ String dest = null;
+ if (optionsHelper.hasOption(OPTION_DEST)) {
+ dest = optionsHelper.getOptionValue(OPTION_DEST);
+ }
+
+ kylinConfig = KylinConfig.getInstanceFromEnv();
+ metadataManager = MetadataManager.getInstance(kylinConfig);
+ projectManager = ProjectManager.getInstance(kylinConfig);
+ hybridManager = HybridManager.getInstance(kylinConfig);
+ cubeManager = CubeManager.getInstance(kylinConfig);
+ cubeDescManager = CubeDescManager.getInstance(kylinConfig);
+ streamingManager = StreamingManager.getInstance(kylinConfig);
+ kafkaConfigManager = KafkaConfigManager.getInstance(kylinConfig);
+ executableDao = ExecutableDao.getInstance(kylinConfig);
+ realizationRegistry = RealizationRegistry.getInstance(kylinConfig);
+
+ if (optionsHelper.hasOption(OPTION_PROJECT)) {
+ ProjectInstance projectInstance = projectManager.getProject(optionsHelper.getOptionValue(OPTION_PROJECT));
+ if (projectInstance == null) {
+ throw new IllegalArgumentException("Project " + optionsHelper.getOptionValue(OPTION_PROJECT) + " does not exist");
+ }
+ addRequired(requiredResources, ProjectInstance.concatResourcePath(projectInstance.getName()));
+ List<RealizationEntry> realizationEntries = projectInstance.getRealizationEntries();
+ for (RealizationEntry realizationEntry : realizationEntries) {
+ retrieveResourcePath(getRealization(realizationEntry));
+ }
+ } else if (optionsHelper.hasOption(OPTION_CUBE)) {
+ String cubeName = optionsHelper.getOptionValue(OPTION_CUBE);
+ IRealization realization;
+
+ if ((realization = cubeManager.getRealization(cubeName)) != null) {
+ retrieveResourcePath(realization);
+ } else {
+ throw new IllegalArgumentException("No cube found with name of " + cubeName);
+ }
+ } else if (optionsHelper.hasOption(OPTION_HYBRID)) {
+ String hybridName = optionsHelper.getOptionValue(OPTION_HYBRID);
+ IRealization realization;
+
+ if ((realization = hybridManager.getRealization(hybridName)) != null) {
+ retrieveResourcePath(realization);
+ } else {
+ throw new IllegalArgumentException("No hybrid found with name of" + hybridName);
+ }
+ }
+
+ executeExtraction(dest);
+ }
+
+ private void executeExtraction(String dest) {
+ logger.info("The resource paths going to be extracted:");
+ for (String s : requiredResources) {
+ logger.info(s + "(required)");
+ }
+ for (String s : optionalResources) {
+ logger.info(s + "(optional)");
+ }
+ for (CubeInstance cube : cubesToTrimAndSave) {
+ logger.info("Cube {} will be trimmed and extracted", cube);
+ }
+
+ if (dest == null) {
+ logger.info("Dest is not set, exit directly without extracting");
+ } else {
+ try {
+ ResourceStore src = ResourceStore.getStore(KylinConfig.getInstanceFromEnv());
+ ResourceStore dst = ResourceStore.getStore(KylinConfig.createInstanceFromUri(dest));
+
+ for (String path : requiredResources) {
+ ResourceTool.copyR(src, dst, path);
+ }
+
+ for (String path : optionalResources) {
+ try {
+ ResourceTool.copyR(src, dst, path);
+ } catch (Exception e) {
+ logger.warn("Exception when copying optional resource {}. May be caused by resource missing. Ignore it.");
+ }
+ }
+
+ for (CubeInstance cube : cubesToTrimAndSave) {
+ CubeInstance trimmedCube = CubeInstance.getCopyOf(cube);
+ trimmedCube.getSegments().clear();
+ trimmedCube.setUuid(cube.getUuid());
+ dst.putResource(trimmedCube.getResourcePath(), trimmedCube, CubeManager.CUBE_SERIALIZER);
+ }
+
+ } catch (IOException e) {
+ throw new RuntimeException("IOException", e);
+ }
+ }
+ }
+
+ private IRealization getRealization(RealizationEntry realizationEntry) {
+ return realizationRegistry.getRealization(realizationEntry.getType(), realizationEntry.getRealization());
+ }
+
+ private void dealWithStreaming(CubeInstance cube) {
+ for (StreamingConfig streamingConfig : streamingManager.listAllStreaming()) {
+ if (streamingConfig.getCubeName() != null && streamingConfig.getCubeName().equalsIgnoreCase(cube.getName())) {
+ requiredResources.add(StreamingConfig.concatResourcePath(streamingConfig.getName()));
+ requiredResources.add(KafkaConfig.concatResourcePath(streamingConfig.getName()));
+ }
+ }
+ }
+
+ private void retrieveResourcePath(IRealization realization) {
+
+ logger.info("Deal with realization {} of type {}", realization.getName(), realization.getType());
+
+ if (realization instanceof CubeInstance) {
+ CubeInstance cube = (CubeInstance) realization;
+ String descName = cube.getDescName();
+ CubeDesc cubeDesc = cubeDescManager.getCubeDesc(descName);
+ String modelName = cubeDesc.getModelName();
+ DataModelDesc modelDesc = metadataManager.getDataModelDesc(modelName);
+
+ dealWithStreaming(cube);
+
+ for (String tableName : modelDesc.getAllTables()) {
+ addRequired(requiredResources, TableDesc.concatResourcePath(tableName));
+ addOptional(optionalResources, TableDesc.concatExdResourcePath(tableName));
+ }
+
+ addRequired(requiredResources, DataModelDesc.concatResourcePath(modelDesc.getName()));
+ addRequired(requiredResources, CubeDesc.concatResourcePath(cubeDesc.getName()));
+
+ if (includeSegments) {
+ addRequired(requiredResources, CubeInstance.concatResourcePath(cube.getName()));
+ for (CubeSegment segment : cube.getSegments(SegmentStatusEnum.READY)) {
+ for (String dictPat : segment.getDictionaryPaths()) {
+ addRequired(requiredResources, dictPat);
+ }
+ for (String snapshotPath : segment.getSnapshotPaths()) {
+ addRequired(requiredResources, snapshotPath);
+ }
+ addRequired(requiredResources, segment.getStatisticsResourcePath());
+
+ if (includeJobs) {
+ String lastJobId = segment.getLastBuildJobID();
+ if (!StringUtils.isEmpty(lastJobId)) {
+ throw new RuntimeException("No job exist for segment :" + segment);
+ } else {
+ try {
+ ExecutablePO executablePO = executableDao.getJob(lastJobId);
+ addRequired(requiredResources, ExecutableDao.pathOfJob(lastJobId));
+ addRequired(requiredResources, ExecutableDao.pathOfJobOutput(lastJobId));
+ for (ExecutablePO task : executablePO.getTasks()) {
+ addRequired(requiredResources, ExecutableDao.pathOfJob(task.getUuid()));
+ addRequired(requiredResources, ExecutableDao.pathOfJobOutput(task.getUuid()));
+ }
+ } catch (PersistentException e) {
+ throw new RuntimeException("PersistentException", e);
+ }
+ }
+ } else {
+ logger.info("Job info will not be extracted");
+ }
+ }
+ } else {
+ if (includeJobs) {
+ logger.warn("It's useless to set includeJobs to true when includeSegments is set to false");
+ }
+
+ cubesToTrimAndSave.add(cube);
+ }
+ } else if (realization instanceof HybridInstance) {
+ HybridInstance hybridInstance = (HybridInstance) realization;
+ addRequired(requiredResources, HybridInstance.concatResourcePath(hybridInstance.getName()));
+ for (IRealization iRealization : hybridInstance.getRealizations()) {
+ if (iRealization.getType() != RealizationType.CUBE) {
+ throw new RuntimeException("Hybrid " + iRealization.getName() + " contains non cube child " + iRealization.getName() + " with type " + iRealization.getType());
+ }
+ retrieveResourcePath(iRealization);
+ }
+ } else if (realization instanceof IIInstance) {
+ throw new IllegalStateException("Does not support extract II instance or hybrid that contains II");
+ } else {
+ throw new IllegalStateException("Unknown realization type: " + realization.getType());
+ }
+ }
+
+ private void addRequired(List<String> resourcePaths, String record) {
+ logger.info("adding required resource {}", record);
+ resourcePaths.add(record);
+ }
+
+ private void addOptional(List<String> optionalPaths, String record) {
+ logger.info("adding optional resource {}", record);
+ optionalPaths.add(record);
+ }
+
+ public static void main(String[] args) {
+ CubeMetaExtractor extractor = new CubeMetaExtractor();
+ extractor.execute(args);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/4c08ded6/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceTool.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceTool.java b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceTool.java
index 3b8e0c1..489e45a 100644
--- a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceTool.java
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceTool.java
@@ -115,7 +115,7 @@ public class ResourceTool {
copyR(src, dst, "/");
}
- private static void copyR(ResourceStore src, ResourceStore dst, String path) throws IOException {
+ public static void copyR(ResourceStore src, ResourceStore dst, String path) throws IOException {
ArrayList<String> children = src.listResources(path);
// case of resource (not a folder)
http://git-wip-us.apache.org/repos/asf/kylin/blob/4c08ded6/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
index af04a11..e0b086d 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
@@ -34,24 +34,14 @@
package org.apache.kylin.engine.streaming;
-import java.io.BufferedReader;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.JsonSerializer;
-import org.apache.kylin.common.persistence.RawResource;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.persistence.Serializer;
import org.apache.kylin.common.restclient.Broadcaster;
@@ -60,13 +50,6 @@ import org.apache.kylin.metadata.MetadataConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.fasterxml.jackson.databind.JavaType;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.type.MapType;
-import com.fasterxml.jackson.databind.type.SimpleType;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
/**
*/
public class StreamingManager {
@@ -121,18 +104,6 @@ public class StreamingManager {
}
}
- private String formatStreamingConfigPath(String name) {
- return ResourceStore.STREAMING_RESOURCE_ROOT + "/" + name + ".json";
- }
-
- private String formatStreamingOutputPath(String streaming, int partition) {
- return ResourceStore.STREAMING_OUTPUT_RESOURCE_ROOT + "/" + streaming + "_" + partition + ".json";
- }
-
- private String formatStreamingOutputPath(String streaming, List<Integer> partitions) {
- return ResourceStore.STREAMING_OUTPUT_RESOURCE_ROOT + "/" + streaming + "_" + StringUtils.join(partitions, "_") + ".json";
- }
-
public StreamingConfig getStreamingConfig(String name) {
return streamingMap.get(name);
}
@@ -214,77 +185,12 @@ public class StreamingManager {
if (streamingMap.containsKey(streamingConfig.getName()))
throw new IllegalArgumentException("StreamingConfig '" + streamingConfig.getName() + "' already exists");
- String path = formatStreamingConfigPath(streamingConfig.getName());
+ String path = StreamingConfig.concatResourcePath(streamingConfig.getName());
getStore().putResource(path, streamingConfig, StreamingConfig.SERIALIZER);
streamingMap.put(streamingConfig.getName(), streamingConfig);
return streamingConfig;
}
- public long getOffset(String streaming, int shard) {
- final String resPath = formatStreamingOutputPath(streaming, shard);
- InputStream inputStream = null;
- try {
- final RawResource res = getStore().getResource(resPath);
- if (res == null) {
- return 0;
- } else {
- inputStream = res.inputStream;
- final BufferedReader br = new BufferedReader(new InputStreamReader(inputStream));
- return Long.parseLong(br.readLine());
- }
- } catch (Exception e) {
- logger.error("error get offset, path:" + resPath, e);
- throw new RuntimeException("error get offset, path:" + resPath, e);
- } finally {
- IOUtils.closeQuietly(inputStream);
- }
- }
-
- public void updateOffset(String streaming, int shard, long offset) {
- Preconditions.checkArgument(offset >= 0, "offset cannot be smaller than 0");
- final String resPath = formatStreamingOutputPath(streaming, shard);
- try {
- getStore().putResource(resPath, new ByteArrayInputStream(Long.valueOf(offset).toString().getBytes()), getStore().getResourceTimestamp(resPath));
- } catch (IOException e) {
- logger.error("error update offset, path:" + resPath, e);
- throw new RuntimeException("error update offset, path:" + resPath, e);
- }
- }
-
- public Map<Integer, Long> getOffset(String streaming, List<Integer> partitions) {
- Collections.sort(partitions);
- final String resPath = formatStreamingOutputPath(streaming, partitions);
- InputStream inputStream = null;
- try {
- RawResource res = getStore().getResource(resPath);
- if (res == null)
- return Collections.emptyMap();
-
- inputStream = res.inputStream;
- final HashMap<Integer, Long> result = mapper.readValue(inputStream, mapType);
- return result;
- } catch (IOException e) {
- logger.error("error get offset, path:" + resPath, e);
- throw new RuntimeException("error get offset, path:" + resPath, e);
- } finally {
- IOUtils.closeQuietly(inputStream);
- }
- }
-
- public void updateOffset(String streaming, HashMap<Integer, Long> offset) {
- List<Integer> partitions = Lists.newLinkedList(offset.keySet());
- Collections.sort(partitions);
- final String resPath = formatStreamingOutputPath(streaming, partitions);
- try {
- final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- mapper.writeValue(baos, offset);
- getStore().putResource(resPath, new ByteArrayInputStream(baos.toByteArray()), getStore().getResourceTimestamp(resPath));
- } catch (IOException e) {
- logger.error("error update offset, path:" + resPath, e);
- throw new RuntimeException("error update offset, path:" + resPath, e);
- }
- }
-
private StreamingConfig loadStreamingConfigAt(String path) throws IOException {
ResourceStore store = getStore();
StreamingConfig streamingDesc = store.getResource(path, StreamingConfig.class, STREAMING_SERIALIZER);
@@ -324,8 +230,4 @@ public class StreamingManager {
logger.debug("Loaded " + streamingMap.size() + " StreamingConfig(s)");
}
-
- private final ObjectMapper mapper = new ObjectMapper();
- private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(Integer.class), SimpleType.construct(Long.class));
-
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/4c08ded6/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
index ac20fc3..1d07f23 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
@@ -36,7 +36,6 @@ package org.apache.kylin.source.kafka;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
@@ -52,11 +51,6 @@ import org.apache.kylin.source.kafka.config.KafkaConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.fasterxml.jackson.databind.JavaType;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.type.MapType;
-import com.fasterxml.jackson.databind.type.SimpleType;
-
/**
*/
public class KafkaConfigManager {
@@ -87,7 +81,7 @@ public class KafkaConfigManager {
return ResourceStore.getStore(this.config);
}
- public static KafkaConfigManager getInstance(KylinConfig config){
+ public static KafkaConfigManager getInstance(KylinConfig config) {
KafkaConfigManager r = CACHE.get(config);
if (r != null) {
return r;
@@ -98,16 +92,16 @@ public class KafkaConfigManager {
if (r != null) {
return r;
}
- try{
- r = new KafkaConfigManager(config);
- CACHE.put(config, r);
- if (CACHE.size() > 1) {
- logger.warn("More than one KafkaConfigManager singleton exist");
+ try {
+ r = new KafkaConfigManager(config);
+ CACHE.put(config, r);
+ if (CACHE.size() > 1) {
+ logger.warn("More than one KafkaConfigManager singleton exist");
+ }
+ return r;
+ } catch (IOException e) {
+ throw new IllegalStateException("Failed to init KafkaConfigManager from " + config, e);
}
- return r;
- } catch (IOException e) {
- throw new IllegalStateException("Failed to init KafkaConfigManager from " + config, e);
- }
}
}
@@ -125,7 +119,7 @@ public class KafkaConfigManager {
public KafkaConfig reloadKafkaConfigLocal(String name) throws IOException {
// Save Source
- String path = KafkaConfig.getKafkaResourcePath(name);
+ String path = KafkaConfig.concatResourcePath(name);
// Reload the KafkaConfig
KafkaConfig ndesc = loadKafkaConfigAt(path);
@@ -135,14 +129,6 @@ public class KafkaConfigManager {
return ndesc;
}
- private boolean checkExistence(String name) {
- return true;
- }
-
- private String formatStreamingConfigPath(String name) {
- return ResourceStore.KAFKA_RESOURCE_ROOT + "/" + name + ".json";
- }
-
public boolean createKafkaConfig(String name, KafkaConfig config) {
if (config == null || StringUtils.isEmpty(config.getName())) {
@@ -152,7 +138,7 @@ public class KafkaConfigManager {
if (kafkaMap.containsKey(config.getName()))
throw new IllegalArgumentException("KafkaConfig '" + config.getName() + "' already exists");
try {
- getStore().putResource(formatStreamingConfigPath(name), config, KafkaConfig.SERIALIZER);
+ getStore().putResource(KafkaConfig.concatResourcePath(name), config, KafkaConfig.SERIALIZER);
kafkaMap.put(config.getName(), config);
return true;
} catch (IOException e) {
@@ -185,7 +171,7 @@ public class KafkaConfigManager {
private KafkaConfig loadKafkaConfigAt(String path) throws IOException {
ResourceStore store = getStore();
- KafkaConfig kafkaConfig = store.getResource(path, KafkaConfig.class,KAFKA_SERIALIZER );
+ KafkaConfig kafkaConfig = store.getResource(path, KafkaConfig.class, KAFKA_SERIALIZER);
if (StringUtils.isBlank(kafkaConfig.getName())) {
throw new IllegalStateException("KafkaConfig name must not be blank");
@@ -193,7 +179,6 @@ public class KafkaConfigManager {
return kafkaConfig;
}
-
public KafkaConfig getKafkaConfig(String name) {
return kafkaMap.get(name);
}
@@ -203,7 +188,7 @@ public class KafkaConfigManager {
throw new IllegalArgumentException();
}
- String path = formatStreamingConfigPath(kafkaConfig.getName());
+ String path = KafkaConfig.concatResourcePath(kafkaConfig.getName());
getStore().putResource(path, kafkaConfig, KafkaConfig.SERIALIZER);
}
@@ -214,7 +199,6 @@ public class KafkaConfigManager {
kafkaMap.remove(kafkaConfig.getName());
}
-
private void reloadAllKafkaConfig() throws IOException {
ResourceStore store = getStore();
logger.info("Reloading Kafka Metadata from folder " + store.getReadableResourcePath(ResourceStore.KAFKA_RESOURCE_ROOT));
@@ -245,7 +229,4 @@ public class KafkaConfigManager {
logger.debug("Loaded " + kafkaMap.size() + " KafkaConfig(s)");
}
- private final ObjectMapper mapper = new ObjectMapper();
- private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(Integer.class), SimpleType.construct(Long.class));
-
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/4c08ded6/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
index 100ca2d..1dce844 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
@@ -85,10 +85,10 @@ public class KafkaConfig extends RootPersistentEntity {
private String parserProperties;
public String getResourcePath() {
- return getKafkaResourcePath(name);
+ return concatResourcePath(name);
}
- public static String getKafkaResourcePath(String streamingName) {
+ public static String concatResourcePath(String streamingName) {
return ResourceStore.KAFKA_RESOURCE_ROOT + "/" + streamingName + MetadataConstants.FILE_SURFIX;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/4c08ded6/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMetaExtractor.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMetaExtractor.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMetaExtractor.java
deleted file mode 100644
index 680dff8..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMetaExtractor.java
+++ /dev/null
@@ -1,284 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.storage.hbase.util;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.OptionGroup;
-import org.apache.commons.cli.Options;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.ResourceTool;
-import org.apache.kylin.common.util.AbstractApplication;
-import org.apache.kylin.common.util.OptionsHelper;
-import org.apache.kylin.cube.CubeDescManager;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.invertedindex.IIDescManager;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.job.dao.ExecutableDao;
-import org.apache.kylin.job.dao.ExecutablePO;
-import org.apache.kylin.job.exception.PersistentException;
-import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.model.DataModelDesc;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.metadata.project.ProjectInstance;
-import org.apache.kylin.metadata.project.ProjectManager;
-import org.apache.kylin.metadata.project.RealizationEntry;
-import org.apache.kylin.metadata.realization.IRealization;
-import org.apache.kylin.metadata.realization.RealizationRegistry;
-import org.apache.kylin.storage.hybrid.HybridInstance;
-import org.apache.kylin.storage.hybrid.HybridManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
-/**
- * extract cube related info for debugging/distributing purpose
- * TODO: deal with II case, deal with Streaming case
- */
-public class CubeMetaExtractor extends AbstractApplication {
-
- private static final Logger logger = LoggerFactory.getLogger(CubeMetaExtractor.class);
-
- @SuppressWarnings("static-access")
- private static final Option OPTION_CUBE = OptionBuilder.withArgName("cube").hasArg().isRequired(false).withDescription("Specify which cube to extract").create("cube");
- @SuppressWarnings("static-access")
- private static final Option OPTION_HYBRID = OptionBuilder.withArgName("hybrid").hasArg().isRequired(false).withDescription("Specify which hybrid to extract").create("hybrid");
- @SuppressWarnings("static-access")
- private static final Option OPTION_PROJECT = OptionBuilder.withArgName("project").hasArg().isRequired(false).withDescription("Specify realizations in which project to extract").create("project");
-
- @SuppressWarnings("static-access")
- private static final Option OPTION_INCLUDE_SEGMENTS = OptionBuilder.withArgName("includeSegments").hasArg().isRequired(false).withDescription("set this to true if want extract the segments info, related dicts, etc.").create("includeSegments");
- @SuppressWarnings("static-access")
- private static final Option OPTION_INCLUDE_JOB = OptionBuilder.withArgName("includeJobs").hasArg().isRequired(false).withDescription("set this to true if want to extract job info/outputs too").create("includeJobs");
-
- @SuppressWarnings("static-access")
- private static final Option OPTION_DEST = OptionBuilder.withArgName("destDir").hasArg().isRequired(false).withDescription("specify the dest dir to save the related metadata").create("destDir");
-
- private Options options = null;
- private KylinConfig kylinConfig;
- private MetadataManager metadataManager;
- private ProjectManager projectManager;
- private HybridManager hybridManager;
- private CubeManager cubeManager;
- private CubeDescManager cubeDescManager;
- private IIManager iiManager;
- private IIDescManager iiDescManager;
- private ExecutableDao executableDao;
- RealizationRegistry realizationRegistry;
-
- public CubeMetaExtractor() {
- options = new Options();
-
- OptionGroup realizationOrProject = new OptionGroup();
- realizationOrProject.addOption(OPTION_CUBE);
- realizationOrProject.addOption(OPTION_PROJECT);
- realizationOrProject.addOption(OPTION_HYBRID);
- realizationOrProject.setRequired(true);
-
- options.addOptionGroup(realizationOrProject);
- options.addOption(OPTION_INCLUDE_SEGMENTS);
- options.addOption(OPTION_INCLUDE_JOB);
- options.addOption(OPTION_DEST);
-
- }
-
- @Override
- protected Options getOptions() {
- return options;
- }
-
- @Override
- protected void execute(OptionsHelper optionsHelper) throws Exception {
- boolean includeSegments = optionsHelper.hasOption(OPTION_INCLUDE_SEGMENTS) ? Boolean.valueOf(optionsHelper.getOptionValue(OPTION_INCLUDE_SEGMENTS)) : true;
- boolean includeJobs = optionsHelper.hasOption(OPTION_INCLUDE_JOB) ? Boolean.valueOf(optionsHelper.getOptionValue(OPTION_INCLUDE_JOB)) : true;
- String dest = null;
- if (optionsHelper.hasOption(OPTION_DEST)) {
- dest = optionsHelper.getOptionValue(OPTION_DEST);
- }
-
- if (!includeSegments) {
- throw new RuntimeException("Does not support skip segments for now");
- }
-
- kylinConfig = KylinConfig.getInstanceFromEnv();
- metadataManager = MetadataManager.getInstance(kylinConfig);
- projectManager = ProjectManager.getInstance(kylinConfig);
- hybridManager = HybridManager.getInstance(kylinConfig);
- cubeManager = CubeManager.getInstance(kylinConfig);
- cubeDescManager = CubeDescManager.getInstance(kylinConfig);
- iiManager = IIManager.getInstance(kylinConfig);
- iiDescManager = IIDescManager.getInstance(kylinConfig);
- executableDao = ExecutableDao.getInstance(kylinConfig);
- realizationRegistry = RealizationRegistry.getInstance(kylinConfig);
-
- List<String> requiredResources = Lists.newArrayList();
- List<String> optionalResources = Lists.newArrayList();
-
- if (optionsHelper.hasOption(OPTION_PROJECT)) {
- ProjectInstance projectInstance = projectManager.getProject(optionsHelper.getOptionValue(OPTION_PROJECT));
- if (projectInstance == null) {
- throw new IllegalArgumentException("Project " + optionsHelper.getOptionValue(OPTION_PROJECT) + " does not exist");
- }
- addRequired(requiredResources, ProjectInstance.concatResourcePath(projectInstance.getName()));
- List<RealizationEntry> realizationEntries = projectInstance.getRealizationEntries();
- for (RealizationEntry realizationEntry : realizationEntries) {
- retrieveResourcePath(getRealization(realizationEntry), includeSegments, includeJobs, requiredResources, optionalResources);
- }
- } else if (optionsHelper.hasOption(OPTION_CUBE)) {
- String cubeName = optionsHelper.getOptionValue(OPTION_CUBE);
- IRealization realization;
-
- if ((realization = cubeManager.getRealization(cubeName)) != null) {
- retrieveResourcePath(realization, includeSegments, includeJobs, requiredResources, optionalResources);
- } else {
- throw new IllegalArgumentException("No cube found with name of " + cubeName);
- }
- } else if (optionsHelper.hasOption(OPTION_HYBRID)) {
- String hybridName = optionsHelper.getOptionValue(OPTION_HYBRID);
- IRealization realization;
-
- if ((realization = hybridManager.getRealization(hybridName)) != null) {
- retrieveResourcePath(realization, includeSegments, includeJobs, requiredResources, optionalResources);
- } else {
- throw new IllegalArgumentException("No hybrid found with name of" + hybridName);
- }
- }
-
- executeExtraction(requiredResources, optionalResources, dest);
- }
-
- private void executeExtraction(List<String> requiredPaths, List<String> optionalPaths, String dest) {
- logger.info("The resource paths going to be extracted:");
- for (String s : requiredPaths) {
- logger.info(s + "(required)");
- }
- for (String s : optionalPaths) {
- logger.info(s + "(optional)");
- }
-
- if (dest == null) {
- logger.info("Dest is not set, exit directly without extracting");
- } else {
- try {
- ResourceTool.copy(KylinConfig.getInstanceFromEnv(), KylinConfig.createInstanceFromUri(dest));
- } catch (IOException e) {
- throw new RuntimeException("IOException", e);
- }
- }
- }
-
- private IRealization getRealization(RealizationEntry realizationEntry) {
- return realizationRegistry.getRealization(realizationEntry.getType(), realizationEntry.getRealization());
- }
-
- private void retrieveResourcePath(IRealization realization, boolean includeSegments, boolean includeJobs, List<String> requiredResources, List<String> optionalResources) {
-
- logger.info("Deal with realization {} of type {}", realization.getName(), realization.getType());
-
- if (realization instanceof CubeInstance) {
- CubeInstance cube = (CubeInstance) realization;
- String descName = cube.getDescName();
- CubeDesc cubeDesc = cubeDescManager.getCubeDesc(descName);
- String modelName = cubeDesc.getModelName();
- DataModelDesc modelDesc = metadataManager.getDataModelDesc(modelName);
-
- for (String tableName : modelDesc.getAllTables()) {
- addRequired(requiredResources, TableDesc.concatResourcePath(tableName));
- addOptional(optionalResources, TableDesc.concatExdResourcePath(tableName));
- }
-
- addRequired(requiredResources, DataModelDesc.concatResourcePath(modelDesc.getName()));
- addRequired(requiredResources, CubeDesc.concatResourcePath(cubeDesc.getName()));
-
- if (includeSegments) {
- addRequired(requiredResources, CubeInstance.concatResourcePath(cube.getName()));
- for (CubeSegment segment : cube.getSegments()) {
- for (String dictPat : segment.getDictionaryPaths()) {
- addRequired(requiredResources, dictPat);
- }
- for (String snapshotPath : segment.getSnapshotPaths()) {
- addRequired(requiredResources, snapshotPath);
- }
- addRequired(requiredResources, segment.getStatisticsResourcePath());
-
- if (includeJobs) {
- String lastJobId = segment.getLastBuildJobID();
- if (!StringUtils.isEmpty(lastJobId)) {
- logger.warn("No job exist for segment {}", segment);
- } else {
- try {
- ExecutablePO executablePO = executableDao.getJob(lastJobId);
- addRequired(requiredResources, ExecutableDao.pathOfJob(lastJobId));
- addRequired(requiredResources, ExecutableDao.pathOfJobOutput(lastJobId));
- for (ExecutablePO task : executablePO.getTasks()) {
- addRequired(requiredResources, ExecutableDao.pathOfJob(task.getUuid()));
- addRequired(requiredResources, ExecutableDao.pathOfJobOutput(task.getUuid()));
- }
- } catch (PersistentException e) {
- throw new RuntimeException("PersistentException", e);
- }
- }
- } else {
- logger.info("Job info will not be extracted");
- }
- }
- } else {
- if (includeJobs) {
- logger.warn("It's useless to set includeJobs to true when includeSegments is set to false");
- }
-
- throw new IllegalStateException("Does not support skip segments now");
- }
- } else if (realization instanceof HybridInstance) {
- HybridInstance hybridInstance = (HybridInstance) realization;
- addRequired(requiredResources, HybridInstance.concatResourcePath(hybridInstance.getName()));
- for (IRealization iRealization : hybridInstance.getRealizations()) {
- retrieveResourcePath(iRealization, includeSegments, includeJobs, requiredResources, optionalResources);
- }
- } else if (realization instanceof IIInstance) {
- throw new IllegalStateException("Does not support extract II instance or hybrid that contains II");
- } else {
- throw new IllegalStateException("Unknown realization type: " + realization.getType());
- }
- }
-
- private void addRequired(List<String> resourcePaths, String record) {
- logger.info("adding required resource {}", record);
- resourcePaths.add(record);
- }
-
- private void addOptional(List<String> optionalPaths, String record) {
- logger.info("adding optional resource {}", record);
- optionalPaths.add(record);
- }
-
- public static void main(String[] args) {
- CubeMetaExtractor extractor = new CubeMetaExtractor();
- extractor.execute(args);
- }
-}
[3/4] kylin git commit: minor, fix CI occasionally fail issue
Posted by ma...@apache.org.
minor, fix CI occasionally fail issue
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/3f5074ee
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/3f5074ee
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/3f5074ee
Branch: refs/heads/2.x-staging
Commit: 3f5074ee1568d5b0ba50d70d5c35319cd8223cc9
Parents: ab9d579
Author: honma <ho...@ebay.com>
Authored: Thu Feb 25 14:13:48 2016 +0800
Committer: honma <ho...@ebay.com>
Committed: Fri Feb 26 17:54:37 2016 +0800
----------------------------------------------------------------------
.../dict/TupleFilterDictionaryTranslater.java | 165 ------------------
.../dict/TupleFilterFunctionTranslator.java | 166 +++++++++++++++++++
.../metadata/filter/FunctionTupleFilter.java | 5 +-
.../metadata/filter/function/BuiltInMethod.java | 33 ++--
.../cache/AbstractCacheFledgedQuery.java | 32 +---
.../kylin/storage/cache/DynamicCacheTest.java | 15 +-
.../kylin/storage/cache/StaticCacheTest.java | 19 ++-
.../kylin/storage/hbase/ITStorageTest.java | 11 +-
.../common/coprocessor/FilterDecorator.java | 4 +-
.../hbase/cube/v2/CubeSegmentScanner.java | 4 +-
10 files changed, 226 insertions(+), 228 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/3f5074ee/core-dictionary/src/main/java/org/apache/kylin/dict/TupleFilterDictionaryTranslater.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/TupleFilterDictionaryTranslater.java b/core-dictionary/src/main/java/org/apache/kylin/dict/TupleFilterDictionaryTranslater.java
deleted file mode 100644
index 9ef360d..0000000
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/TupleFilterDictionaryTranslater.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.dict;
-
-import com.google.common.primitives.Primitives;
-import org.apache.kylin.common.util.Dictionary;
-import org.apache.kylin.metadata.filter.ColumnTupleFilter;
-import org.apache.kylin.metadata.filter.CompareTupleFilter;
-import org.apache.kylin.metadata.filter.ConstantTupleFilter;
-import org.apache.kylin.metadata.filter.FunctionTupleFilter;
-import org.apache.kylin.metadata.filter.ITupleFilterTranslator;
-import org.apache.kylin.metadata.filter.LogicalTupleFilter;
-import org.apache.kylin.metadata.filter.TupleFilter;
-import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ListIterator;
-
-/**
- * Created by dongli on 1/7/16.
- */
-public class TupleFilterDictionaryTranslater implements ITupleFilterTranslator {
- public static final Logger logger = LoggerFactory.getLogger(TupleFilterDictionaryTranslater.class);
-
- private IDictionaryAware dictionaryAware;
-
- public TupleFilterDictionaryTranslater(IDictionaryAware dictionaryAware) {
- this.dictionaryAware = dictionaryAware;
- }
-
- @Override
- public TupleFilter translate(TupleFilter tupleFilter) {
- TupleFilter translated = null;
- if (tupleFilter instanceof CompareTupleFilter) {
- translated = translateCompareTupleFilter((CompareTupleFilter) tupleFilter);
- if (translated != null) {
- logger.info("Translated {" + tupleFilter + "} to IN clause: {" + translated + "}");
- }
- } else if (tupleFilter instanceof FunctionTupleFilter) {
- translated = translateFunctionTupleFilter((FunctionTupleFilter) tupleFilter);
- if (translated != null) {
- logger.info("Translated {" + tupleFilter + "} to IN clause: {" + translated + "}");
- }
- } else if (tupleFilter instanceof LogicalTupleFilter) {
- ListIterator<TupleFilter> childIterator = (ListIterator<TupleFilter>) tupleFilter.getChildren().listIterator();
- while (childIterator.hasNext()) {
- TupleFilter tempTranslated = translate(childIterator.next());
- if (tempTranslated != null)
- childIterator.set(tempTranslated);
- }
- }
- return translated == null ? tupleFilter : translated;
- }
-
- private TupleFilter translateFunctionTupleFilter(FunctionTupleFilter functionTupleFilter) {
- if (!functionTupleFilter.isValid())
- return null;
-
- TblColRef columnRef = functionTupleFilter.getColumn();
- Dictionary<?> dict = dictionaryAware.getDictionary(columnRef);
- if (dict == null)
- return null;
-
- CompareTupleFilter translated = new CompareTupleFilter(FilterOperatorEnum.IN);
- translated.addChild(new ColumnTupleFilter(columnRef));
-
- try {
- for (int i = dict.getMinId(); i <= dict.getMaxId(); i++) {
- Object dictVal = dict.getValueFromId(i);
- if ((Boolean) functionTupleFilter.invokeFunction(dictVal)) {
- translated.addChild(new ConstantTupleFilter(dictVal));
- }
- }
- } catch (Exception e) {
- logger.debug(e.getMessage());
- return null;
- }
- return translated;
- }
-
- @SuppressWarnings("unchecked")
- private TupleFilter translateCompareTupleFilter(CompareTupleFilter compTupleFilter) {
- if (compTupleFilter.getFunction() == null)
- return null;
-
- FunctionTupleFilter functionTupleFilter = compTupleFilter.getFunction();
- if (!functionTupleFilter.isValid())
- return null;
-
- TblColRef columnRef = functionTupleFilter.getColumn();
- Dictionary<?> dict = dictionaryAware.getDictionary(columnRef);
- if (dict == null)
- return null;
-
- CompareTupleFilter translated = new CompareTupleFilter(FilterOperatorEnum.IN);
- translated.addChild(new ColumnTupleFilter(columnRef));
-
- try {
- for (int i = dict.getMinId(); i <= dict.getMaxId(); i++) {
- Object dictVal = dict.getValueFromId(i);
- Object computedVal = functionTupleFilter.invokeFunction(dictVal);
- Class clazz = Primitives.wrap(computedVal.getClass());
- Object targetVal = compTupleFilter.getFirstValue();
- if (Primitives.isWrapperType(clazz))
- targetVal = clazz.cast(clazz.getDeclaredMethod("valueOf", String.class).invoke(null, compTupleFilter.getFirstValue()));
-
- int comp = ((Comparable) computedVal).compareTo(targetVal);
- boolean compResult = false;
- switch (compTupleFilter.getOperator()) {
- case EQ:
- compResult = comp == 0;
- break;
- case NEQ:
- compResult = comp != 0;
- break;
- case LT:
- compResult = comp < 0;
- break;
- case LTE:
- compResult = comp <= 0;
- break;
- case GT:
- compResult = comp > 0;
- break;
- case GTE:
- compResult = comp >= 0;
- break;
- case IN:
- compResult = compTupleFilter.getValues().contains(computedVal.toString());
- break;
- case NOTIN:
- compResult = !compTupleFilter.getValues().contains(computedVal.toString());
- break;
- default:
- break;
- }
- if (compResult) {
- translated.addChild(new ConstantTupleFilter(dictVal));
- }
- }
- } catch (Exception e) {
- logger.debug(e.getMessage());
- return null;
- }
- return translated;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/3f5074ee/core-dictionary/src/main/java/org/apache/kylin/dict/TupleFilterFunctionTranslator.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/TupleFilterFunctionTranslator.java b/core-dictionary/src/main/java/org/apache/kylin/dict/TupleFilterFunctionTranslator.java
new file mode 100644
index 0000000..1c96dd4
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/TupleFilterFunctionTranslator.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.dict;
+
+import java.util.ListIterator;
+
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.metadata.filter.ColumnTupleFilter;
+import org.apache.kylin.metadata.filter.CompareTupleFilter;
+import org.apache.kylin.metadata.filter.ConstantTupleFilter;
+import org.apache.kylin.metadata.filter.FunctionTupleFilter;
+import org.apache.kylin.metadata.filter.ITupleFilterTranslator;
+import org.apache.kylin.metadata.filter.LogicalTupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.primitives.Primitives;
+
+/**
+ * only take effect when the compare filter has function
+ */
+public class TupleFilterFunctionTranslator implements ITupleFilterTranslator {
+ public static final Logger logger = LoggerFactory.getLogger(TupleFilterFunctionTranslator.class);
+
+ private IDictionaryAware dictionaryAware;
+
+ public TupleFilterFunctionTranslator(IDictionaryAware dictionaryAware) {
+ this.dictionaryAware = dictionaryAware;
+ }
+
+ @Override
+ public TupleFilter translate(TupleFilter tupleFilter) {
+ TupleFilter translated = null;
+ if (tupleFilter instanceof CompareTupleFilter) {
+ translated = translateCompareTupleFilter((CompareTupleFilter) tupleFilter);
+ if (translated != null) {
+ logger.info("Translated {" + tupleFilter + "} to IN clause: {" + translated + "}");
+ }
+ } else if (tupleFilter instanceof FunctionTupleFilter) {
+ translated = translateFunctionTupleFilter((FunctionTupleFilter) tupleFilter);
+ if (translated != null) {
+ logger.info("Translated {" + tupleFilter + "} to IN clause: {" + translated + "}");
+ }
+ } else if (tupleFilter instanceof LogicalTupleFilter) {
+ ListIterator<TupleFilter> childIterator = (ListIterator<TupleFilter>) tupleFilter.getChildren().listIterator();
+ while (childIterator.hasNext()) {
+ TupleFilter tempTranslated = translate(childIterator.next());
+ if (tempTranslated != null)
+ childIterator.set(tempTranslated);
+ }
+ }
+ return translated == null ? tupleFilter : translated;
+ }
+
+ private TupleFilter translateFunctionTupleFilter(FunctionTupleFilter functionTupleFilter) {
+ if (!functionTupleFilter.isValid())
+ return null;
+
+ TblColRef columnRef = functionTupleFilter.getColumn();
+ Dictionary<?> dict = dictionaryAware.getDictionary(columnRef);
+ if (dict == null)
+ return null;
+
+ CompareTupleFilter translated = new CompareTupleFilter(FilterOperatorEnum.IN);
+ translated.addChild(new ColumnTupleFilter(columnRef));
+
+ try {
+ for (int i = dict.getMinId(); i <= dict.getMaxId(); i++) {
+ Object dictVal = dict.getValueFromId(i);
+ if ((Boolean) functionTupleFilter.invokeFunction(dictVal)) {
+ translated.addChild(new ConstantTupleFilter(dictVal));
+ }
+ }
+ } catch (Exception e) {
+ logger.debug(e.getMessage());
+ return null;
+ }
+ return translated;
+ }
+
+ @SuppressWarnings("unchecked")
+ private TupleFilter translateCompareTupleFilter(CompareTupleFilter compTupleFilter) {
+ if (compTupleFilter.getFunction() == null)
+ return null;
+
+ FunctionTupleFilter functionTupleFilter = compTupleFilter.getFunction();
+ if (!functionTupleFilter.isValid())
+ return null;
+
+ TblColRef columnRef = functionTupleFilter.getColumn();
+ Dictionary<?> dict = dictionaryAware.getDictionary(columnRef);
+ if (dict == null)
+ return null;
+
+ CompareTupleFilter translated = new CompareTupleFilter(FilterOperatorEnum.IN);
+ translated.addChild(new ColumnTupleFilter(columnRef));
+
+ try {
+ for (int i = dict.getMinId(); i <= dict.getMaxId(); i++) {
+ Object dictVal = dict.getValueFromId(i);
+ Object computedVal = functionTupleFilter.invokeFunction(dictVal);
+ Class clazz = Primitives.wrap(computedVal.getClass());
+ Object targetVal = compTupleFilter.getFirstValue();
+ if (Primitives.isWrapperType(clazz))
+ targetVal = clazz.cast(clazz.getDeclaredMethod("valueOf", String.class).invoke(null, compTupleFilter.getFirstValue()));
+
+ int comp = ((Comparable) computedVal).compareTo(targetVal);
+ boolean compResult = false;
+ switch (compTupleFilter.getOperator()) {
+ case EQ:
+ compResult = comp == 0;
+ break;
+ case NEQ:
+ compResult = comp != 0;
+ break;
+ case LT:
+ compResult = comp < 0;
+ break;
+ case LTE:
+ compResult = comp <= 0;
+ break;
+ case GT:
+ compResult = comp > 0;
+ break;
+ case GTE:
+ compResult = comp >= 0;
+ break;
+ case IN:
+ compResult = compTupleFilter.getValues().contains(computedVal.toString());
+ break;
+ case NOTIN:
+ compResult = !compTupleFilter.getValues().contains(computedVal.toString());
+ break;
+ default:
+ break;
+ }
+ if (compResult) {
+ translated.addChild(new ConstantTupleFilter(dictVal));
+ }
+ }
+ } catch (Exception e) {
+ logger.debug(e.getMessage());
+ return null;
+ }
+ return translated;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/3f5074ee/core-metadata/src/main/java/org/apache/kylin/metadata/filter/FunctionTupleFilter.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/FunctionTupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/FunctionTupleFilter.java
index 15fcb72..30bef97 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/FunctionTupleFilter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/FunctionTupleFilter.java
@@ -35,9 +35,6 @@ import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
import com.google.common.primitives.Primitives;
-/**
- * Created by dongli on 11/11/15.
- */
public class FunctionTupleFilter extends TupleFilter {
public static final Logger logger = LoggerFactory.getLogger(FunctionTupleFilter.class);
@@ -79,7 +76,7 @@ public class FunctionTupleFilter extends TupleFilter {
if (columnContainerFilter instanceof ColumnTupleFilter)
methodParams.set(colPosition, (Serializable) input);
else if (columnContainerFilter instanceof FunctionTupleFilter)
- methodParams.set(colPosition, (Serializable) ((FunctionTupleFilter) columnContainerFilter).invokeFunction((Serializable) input));
+ methodParams.set(colPosition, (Serializable) ((FunctionTupleFilter) columnContainerFilter).invokeFunction(input));
return method.invoke(null, (Object[]) (methodParams.toArray()));
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/3f5074ee/core-metadata/src/main/java/org/apache/kylin/metadata/filter/function/BuiltInMethod.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/function/BuiltInMethod.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/function/BuiltInMethod.java
index b927d8d..7b241cc 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/function/BuiltInMethod.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/function/BuiltInMethod.java
@@ -18,29 +18,21 @@
package org.apache.kylin.metadata.filter.function;
-import com.google.common.collect.ImmutableMap;
-import org.apache.commons.lang3.reflect.MethodUtils;
-
import java.lang.reflect.Method;
import java.util.regex.Pattern;
-/**
- * Created by dongli on 11/13/15.
- */
+import org.apache.commons.lang3.reflect.MethodUtils;
+
+import com.google.common.collect.ImmutableMap;
+
public enum BuiltInMethod {
- UPPER(BuiltInMethod.class, "upper", String.class),
- LOWER(BuiltInMethod.class, "lower", String.class),
- SUBSTRING(BuiltInMethod.class, "substring", String.class, int.class, int.class),
- CHAR_LENGTH(BuiltInMethod.class, "charLength", String.class),
- LIKE(BuiltInMethod.class, "like", String.class, String.class),
- INITCAP(BuiltInMethod.class, "initcap", String.class);
+ UPPER(BuiltInMethod.class, "upper", String.class), LOWER(BuiltInMethod.class, "lower", String.class), SUBSTRING(BuiltInMethod.class, "substring", String.class, int.class, int.class), CHAR_LENGTH(BuiltInMethod.class, "charLength", String.class), LIKE(BuiltInMethod.class, "like", String.class, String.class), INITCAP(BuiltInMethod.class, "initcap", String.class);
public final Method method;
public static final ImmutableMap<String, BuiltInMethod> MAP;
static {
- final ImmutableMap.Builder<String, BuiltInMethod> builder =
- ImmutableMap.builder();
+ final ImmutableMap.Builder<String, BuiltInMethod> builder = ImmutableMap.builder();
for (BuiltInMethod value : BuiltInMethod.values()) {
if (value.method != null) {
builder.put(value.name(), value);
@@ -70,22 +62,22 @@ public enum BuiltInMethod {
for (int i = 0; i < len; i++) {
char curCh = s.charAt(i);
final int c = (int) curCh;
- if (start) { // curCh is whitespace or first character of word.
+ if (start) { // curCh is whitespace or first character of word.
if (c > 47 && c < 58) { // 0-9
start = false;
- } else if (c > 64 && c < 91) { // A-Z
+ } else if (c > 64 && c < 91) { // A-Z
start = false;
- } else if (c > 96 && c < 123) { // a-z
+ } else if (c > 96 && c < 123) { // a-z
start = false;
curCh = (char) (c - 32); // Uppercase this character
}
// else {} whitespace
- } else { // Inside of a word or white space after end of word.
+ } else { // Inside of a word or white space after end of word.
if (c > 47 && c < 58) { // 0-9
// noop
- } else if (c > 64 && c < 91) { // A-Z
+ } else if (c > 64 && c < 91) { // A-Z
curCh = (char) (c + 32); // Lowercase this character
- } else if (c > 96 && c < 123) { // a-z
+ } else if (c > 96 && c < 123) { // a-z
// noop
} else { // whitespace
start = true;
@@ -116,5 +108,4 @@ public enum BuiltInMethod {
return s.toLowerCase();
}
-
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/3f5074ee/core-storage/src/main/java/org/apache/kylin/storage/cache/AbstractCacheFledgedQuery.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/cache/AbstractCacheFledgedQuery.java b/core-storage/src/main/java/org/apache/kylin/storage/cache/AbstractCacheFledgedQuery.java
index 6ba76c4..a5ca800 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/cache/AbstractCacheFledgedQuery.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/cache/AbstractCacheFledgedQuery.java
@@ -5,9 +5,6 @@ import net.sf.ehcache.CacheManager;
import net.sf.ehcache.Element;
import net.sf.ehcache.Status;
import net.sf.ehcache.config.CacheConfiguration;
-import net.sf.ehcache.config.Configuration;
-import net.sf.ehcache.config.PersistenceConfiguration;
-import net.sf.ehcache.store.MemoryStoreEvictionPolicy;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.metadata.realization.StreamSQLDigest;
@@ -21,6 +18,7 @@ import org.slf4j.LoggerFactory;
*/
public abstract class AbstractCacheFledgedQuery implements IStorageQuery, TeeTupleItrListener {
private static final Logger logger = LoggerFactory.getLogger(AbstractCacheFledgedQuery.class);
+
private static final String storageCacheTemplate = "StorageCache";
protected static CacheManager CACHE_MANAGER;
@@ -37,31 +35,6 @@ public abstract class AbstractCacheFledgedQuery implements IStorageQuery, TeeTup
CACHE_MANAGER = cacheManager;
}
- /**
- * This method is only useful non-spring injected test cases.
- * When Kylin is normally ran as a spring app CACHE_MANAGER will be injected.
- * and the configuration for cache lies in server/src/main/resources/ehcache.xml
- *
- * the cache named "StorageCache" acts like a template for each realization to
- * create its own cache.
- */
- private static void initCacheManger() {
- Configuration conf = new Configuration();
- conf.setMaxBytesLocalHeap("128M");
- CACHE_MANAGER = CacheManager.create(conf);
-
- //a fake template for test cases
- Cache storageCache = new Cache(new CacheConfiguration(storageCacheTemplate, 0).//
- memoryStoreEvictionPolicy(MemoryStoreEvictionPolicy.LRU).//
- eternal(false).//
- timeToIdleSeconds(86400).//
- diskExpiryThreadIntervalSeconds(0).//
- //maxBytesLocalHeap(10, MemoryUnit.MEGABYTES).//
- persistence(new PersistenceConfiguration().strategy(PersistenceConfiguration.Strategy.NONE)));
-
- CACHE_MANAGER.addCacheIfAbsent(storageCache);
- }
-
protected StreamSQLResult getStreamSQLResult(StreamSQLDigest streamSQLDigest) {
Cache cache = CACHE_MANAGER.getCache(this.underlyingStorage.getStorageUUID());
@@ -87,8 +60,7 @@ public abstract class AbstractCacheFledgedQuery implements IStorageQuery, TeeTup
private void makeCacheIfNecessary(String storageUUID) {
if (CACHE_MANAGER == null || (!(CACHE_MANAGER.getStatus().equals(Status.STATUS_ALIVE)))) {
- logger.warn("CACHE_MANAGER is not provided or not alive");
- initCacheManger();
+ throw new RuntimeException("CACHE_MANAGER is not provided or not alive");
}
if (CACHE_MANAGER.getCache(storageUUID) == null) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/3f5074ee/core-storage/src/test/java/org/apache/kylin/storage/cache/DynamicCacheTest.java
----------------------------------------------------------------------
diff --git a/core-storage/src/test/java/org/apache/kylin/storage/cache/DynamicCacheTest.java b/core-storage/src/test/java/org/apache/kylin/storage/cache/DynamicCacheTest.java
index 53e5f5b..3193bbb 100644
--- a/core-storage/src/test/java/org/apache/kylin/storage/cache/DynamicCacheTest.java
+++ b/core-storage/src/test/java/org/apache/kylin/storage/cache/DynamicCacheTest.java
@@ -5,6 +5,7 @@ import java.util.IdentityHashMap;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
+import net.sf.ehcache.CacheManager;
import org.apache.commons.lang.NotImplementedException;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.DateFormat;
@@ -20,6 +21,7 @@ import org.apache.kylin.metadata.tuple.Tuple;
import org.apache.kylin.metadata.tuple.TupleInfo;
import org.apache.kylin.storage.ICachableStorageQuery;
import org.apache.kylin.storage.StorageContext;
+import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -32,10 +34,21 @@ import com.google.common.collect.Ranges;
*/
public class DynamicCacheTest {
+ private static CacheManager cacheManager;
+
@BeforeClass
- public static void setup() {
+ public static void setupResource() throws Exception {
System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/sandbox");
KylinConfig.getInstanceFromEnv().setProperty("kylin.query.cache.threshold.duration", "0");
+
+ cacheManager = CacheManager.newInstance("../server/src/main/resources/ehcache-test.xml");
+ AbstractCacheFledgedQuery.setCacheManager(cacheManager);
+ }
+
+ @AfterClass
+ public static void tearDownResource() {
+ cacheManager.shutdown();
+ AbstractCacheFledgedQuery.setCacheManager(null);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/3f5074ee/core-storage/src/test/java/org/apache/kylin/storage/cache/StaticCacheTest.java
----------------------------------------------------------------------
diff --git a/core-storage/src/test/java/org/apache/kylin/storage/cache/StaticCacheTest.java b/core-storage/src/test/java/org/apache/kylin/storage/cache/StaticCacheTest.java
index 182091b..b1665df 100644
--- a/core-storage/src/test/java/org/apache/kylin/storage/cache/StaticCacheTest.java
+++ b/core-storage/src/test/java/org/apache/kylin/storage/cache/StaticCacheTest.java
@@ -6,6 +6,7 @@ import java.util.IdentityHashMap;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
+import net.sf.ehcache.CacheManager;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.IdentityUtils;
import org.apache.kylin.metadata.filter.TupleFilter;
@@ -20,6 +21,7 @@ import org.apache.kylin.metadata.tuple.Tuple;
import org.apache.kylin.metadata.tuple.TupleInfo;
import org.apache.kylin.storage.ICachableStorageQuery;
import org.apache.kylin.storage.StorageContext;
+import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -30,12 +32,25 @@ import com.google.common.collect.Range;
/**
*/
public class StaticCacheTest {
+
+ private static CacheManager cacheManager;
+
@BeforeClass
- public static void setup() {
+ public static void setupResource() throws Exception {
+
System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/sandbox");
-
+
KylinConfig config = KylinConfig.getInstanceFromEnv();
config.setProperty("kylin.query.cache.threshold.duration", "0");
+
+ cacheManager = CacheManager.newInstance("../server/src/main/resources/ehcache-test.xml");
+ AbstractCacheFledgedQuery.setCacheManager(cacheManager);
+ }
+
+ @AfterClass
+ public static void tearDownResource() {
+ cacheManager.shutdown();
+ AbstractCacheFledgedQuery.setCacheManager(null);
}
@Test
http://git-wip-us.apache.org/repos/asf/kylin/blob/3f5074ee/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java b/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java
index d6443e7..c253770 100644
--- a/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java
@@ -18,12 +18,14 @@
package org.apache.kylin.storage.hbase;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import net.sf.ehcache.CacheManager;
+
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.HBaseMetadataTestCase;
import org.apache.kylin.cube.CubeInstance;
@@ -38,6 +40,7 @@ import org.apache.kylin.metadata.tuple.ITupleIterator;
import org.apache.kylin.storage.IStorageQuery;
import org.apache.kylin.storage.StorageContext;
import org.apache.kylin.storage.StorageFactory;
+import org.apache.kylin.storage.cache.AbstractCacheFledgedQuery;
import org.apache.kylin.storage.cache.StorageMockUtils;
import org.apache.kylin.storage.exception.ScanOutOfLimitException;
import org.junit.After;
@@ -54,12 +57,18 @@ public class ITStorageTest extends HBaseMetadataTestCase {
private CubeInstance cube;
private StorageContext context;
+ private static CacheManager cacheManager;
+
@BeforeClass
public static void setupResource() throws Exception {
+ cacheManager = CacheManager.newInstance("../server/src/main/resources/ehcache-test.xml");
+ AbstractCacheFledgedQuery.setCacheManager(cacheManager);
}
@AfterClass
public static void tearDownResource() {
+ cacheManager.shutdown();
+ AbstractCacheFledgedQuery.setCacheManager(null);
}
@Before
http://git-wip-us.apache.org/repos/asf/kylin/blob/3f5074ee/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/FilterDecorator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/FilterDecorator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/FilterDecorator.java
index 294f399..01d3041 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/FilterDecorator.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/FilterDecorator.java
@@ -8,7 +8,7 @@ import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.cube.kv.RowKeyColumnIO;
import org.apache.kylin.dict.DictCodeSystem;
import org.apache.kylin.dict.IDictionaryAware;
-import org.apache.kylin.dict.TupleFilterDictionaryTranslater;
+import org.apache.kylin.dict.TupleFilterFunctionTranslator;
import org.apache.kylin.metadata.filter.ColumnTupleFilter;
import org.apache.kylin.metadata.filter.CompareTupleFilter;
import org.apache.kylin.metadata.filter.ConstantTupleFilter;
@@ -131,7 +131,7 @@ public class FilterDecorator implements TupleFilterSerializer.Decorator {
if (filter == null)
return null;
- ITupleFilterTranslator translator = new TupleFilterDictionaryTranslater(columnIO.getIDictionaryAware());
+ ITupleFilterTranslator translator = new TupleFilterFunctionTranslator(columnIO.getIDictionaryAware());
filter = translator.translate(filter);
// un-evaluatable filter is replaced with TRUE
http://git-wip-us.apache.org/repos/asf/kylin/blob/3f5074ee/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java
index abfb74d..e96c602 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java
@@ -23,7 +23,7 @@ import org.apache.kylin.cube.gridtable.CubeGridTable;
import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
import org.apache.kylin.cube.gridtable.NotEnoughGTInfoException;
import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.dict.TupleFilterDictionaryTranslater;
+import org.apache.kylin.dict.TupleFilterFunctionTranslator;
import org.apache.kylin.gridtable.EmptyGTScanner;
import org.apache.kylin.gridtable.GTInfo;
import org.apache.kylin.gridtable.GTRecord;
@@ -64,7 +64,7 @@ public class CubeSegmentScanner implements IGTScanner {
CuboidToGridTableMapping mapping = cuboid.getCuboidToGridTableMapping();
// translate FunctionTupleFilter to IN clause
- ITupleFilterTranslator translator = new TupleFilterDictionaryTranslater(this.cubeSeg);
+ ITupleFilterTranslator translator = new TupleFilterFunctionTranslator(this.cubeSeg);
filter = translator.translate(filter);
//replace the constant values in filter to dictionary codes