You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/09/26 07:02:03 UTC
carbondata git commit: [CARBONDATA-2969]local dictioanry query fix
for spark-2.3
Repository: carbondata
Updated Branches:
refs/heads/master f2398948c -> 2ab2254be
[CARBONDATA-2969]local dictioanry query fix for spark-2.3
This closes #2761
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/2ab2254b
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/2ab2254b
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/2ab2254b
Branch: refs/heads/master
Commit: 2ab2254be84f82fd2f4b99a6b73353f4c7a55d10
Parents: f239894
Author: akashrn5 <ak...@gmail.com>
Authored: Tue Sep 25 20:43:06 2018 +0530
Committer: Jacky Li <ja...@qq.com>
Committed: Wed Sep 26 15:01:38 2018 +0800
----------------------------------------------------------------------
.../LocalDictionarySupportLoadTableTest.scala | 14 +++++
.../vectorreader/CarbonDictionaryWrapper.java | 44 ---------------
.../vectorreader/ColumnarVectorWrapper.java | 11 +---
.../spark/sql/CarbonDictionaryWrapper.java | 44 +++++++++++++++
.../org/apache/spark/sql/CarbonVectorProxy.java | 10 ++--
.../spark/sql/CarbonDictionaryWrapper.java | 56 ++++++++++++++++++++
.../org/apache/spark/sql/CarbonVectorProxy.java | 8 +--
7 files changed, 127 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2ab2254b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/localdictionary/LocalDictionarySupportLoadTableTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/localdictionary/LocalDictionarySupportLoadTableTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/localdictionary/LocalDictionarySupportLoadTableTest.scala
index e88d8a9..d23c844 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/localdictionary/LocalDictionarySupportLoadTableTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/localdictionary/LocalDictionarySupportLoadTableTest.scala
@@ -136,6 +136,20 @@ class LocalDictionarySupportLoadTableTest extends QueryTest with BeforeAndAfterA
assert(checkForLocalDictionary(getDimRawChunk(2)))
}
+ test("test local dictionary data validation") {
+ sql("drop table if exists local_query_enable")
+ sql("drop table if exists local_query_disable")
+ sql(
+ "CREATE TABLE local_query_enable(name string) STORED BY 'carbondata' tblproperties" +
+ "('local_dictionary_enable'='false','local_dictionary_include'='name')")
+ sql("load data inpath '" + file1 + "' into table local_query_enable OPTIONS('header'='false')")
+ sql(
+ "CREATE TABLE local_query_disable(name string) STORED BY 'carbondata' tblproperties" +
+ "('local_dictionary_enable'='true','local_dictionary_include'='name')")
+ sql("load data inpath '" + file1 + "' into table local_query_disable OPTIONS('header'='false')")
+ checkAnswer(sql("select name from local_query_enable"), sql("select name from local_query_disable"))
+ }
+
test("test to validate local dictionary values"){
sql("drop table if exists local2")
sql("CREATE TABLE local2(name string) STORED BY 'carbondata' tblproperties('local_dictionary_enable'='true')")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2ab2254b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/CarbonDictionaryWrapper.java
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/CarbonDictionaryWrapper.java b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/CarbonDictionaryWrapper.java
deleted file mode 100644
index 7f1e577..0000000
--- a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/CarbonDictionaryWrapper.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.spark.vectorreader;
-
-import org.apache.carbondata.core.scan.result.vector.CarbonDictionary;
-
-import org.apache.parquet.column.Dictionary;
-import org.apache.parquet.column.Encoding;
-import org.apache.parquet.io.api.Binary;
-
-public class CarbonDictionaryWrapper extends Dictionary {
-
- private Binary[] binaries;
-
- CarbonDictionaryWrapper(Encoding encoding, CarbonDictionary dictionary) {
- super(encoding);
- binaries = new Binary[dictionary.getDictionarySize()];
- for (int i = 0; i < binaries.length; i++) {
- binaries[i] = Binary.fromReusedByteArray(dictionary.getDictionaryValue(i));
- }
- }
-
- @Override public int getMaxId() {
- return binaries.length - 1;
- }
-
- @Override public Binary decodeToBinary(int id) {
- return binaries[id];
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2ab2254b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
index a0938da..6acf31f 100644
--- a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
+++ b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
@@ -269,16 +269,7 @@ class ColumnarVectorWrapper implements CarbonColumnVector {
}
@Override public void setDictionary(CarbonDictionary dictionary) {
- if (dictionary == null) {
- sparkColumnVectorProxy.setDictionary(null, ordinal);
- } else {
- sparkColumnVectorProxy
- .setDictionary(new CarbonDictionaryWrapper(Encoding.PLAIN, dictionary),ordinal);
- }
- }
-
- private void setDictionaryType(boolean type) {
- this.isDictionary = type;
+ sparkColumnVectorProxy.setDictionary(dictionary, ordinal);
}
@Override public boolean hasDictionary() {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2ab2254b/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonDictionaryWrapper.java
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonDictionaryWrapper.java b/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonDictionaryWrapper.java
new file mode 100644
index 0000000..b7c6741
--- /dev/null
+++ b/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonDictionaryWrapper.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql;
+
+import org.apache.carbondata.core.scan.result.vector.CarbonDictionary;
+
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.io.api.Binary;
+
+public class CarbonDictionaryWrapper extends Dictionary {
+
+ private Binary[] binaries;
+
+ CarbonDictionaryWrapper(Encoding encoding, CarbonDictionary dictionary) {
+ super(encoding);
+ binaries = new Binary[dictionary.getDictionarySize()];
+ for (int i = 0; i < binaries.length; i++) {
+ binaries[i] = Binary.fromReusedByteArray(dictionary.getDictionaryValue(i));
+ }
+ }
+
+ @Override public int getMaxId() {
+ return binaries.length - 1;
+ }
+
+ @Override public Binary decodeToBinary(int id) {
+ return binaries[id];
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2ab2254b/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonVectorProxy.java
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonVectorProxy.java b/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonVectorProxy.java
index f39bc93..80e6dbd 100644
--- a/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonVectorProxy.java
+++ b/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonVectorProxy.java
@@ -18,7 +18,10 @@ package org.apache.spark.sql;
import java.math.BigInteger;
+import org.apache.carbondata.core.scan.result.vector.CarbonDictionary;
+
import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.column.Encoding;
import org.apache.spark.memory.MemoryMode;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
@@ -82,9 +85,10 @@ public class CarbonVectorProxy {
return columnarBatch.capacity();
}
- public void setDictionary(Object dictionary, int ordinal) {
- if (dictionary instanceof Dictionary) {
- columnarBatch.column(ordinal).setDictionary((Dictionary) dictionary);
+ public void setDictionary(CarbonDictionary dictionary, int ordinal) {
+ if (null != dictionary) {
+ columnarBatch.column(ordinal)
+ .setDictionary(new CarbonDictionaryWrapper(Encoding.PLAIN, dictionary));
} else {
columnarBatch.column(ordinal).setDictionary(null);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2ab2254b/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonDictionaryWrapper.java
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonDictionaryWrapper.java b/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonDictionaryWrapper.java
new file mode 100644
index 0000000..5a99c68
--- /dev/null
+++ b/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonDictionaryWrapper.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql;
+
+import org.apache.carbondata.core.scan.result.vector.CarbonDictionary;
+
+import org.apache.spark.sql.execution.vectorized.Dictionary;
+
+public class CarbonDictionaryWrapper implements Dictionary {
+
+ /**
+ * dictionary values
+ */
+ private byte[][] binaries;
+
+ CarbonDictionaryWrapper(CarbonDictionary dictionary) {
+ binaries = new byte[dictionary.getDictionarySize()][];
+ for (int i = 0; i < binaries.length; i++) {
+ binaries[i] = dictionary.getDictionaryValue(i);
+ }
+ }
+
+ @Override public int decodeToInt(int id) {
+ throw new UnsupportedOperationException("Dictionary encoding does not support int");
+ }
+
+ @Override public long decodeToLong(int id) {
+ throw new UnsupportedOperationException("Dictionary encoding does not support long");
+ }
+
+ @Override public float decodeToFloat(int id) {
+ throw new UnsupportedOperationException("Dictionary encoding does not support float");
+ }
+
+ @Override public double decodeToDouble(int id) {
+ throw new UnsupportedOperationException("Dictionary encoding does not support double");
+ }
+
+ @Override public byte[] decodeToBinary(int id) {
+ return binaries[id];
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2ab2254b/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonVectorProxy.java
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonVectorProxy.java b/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonVectorProxy.java
index 0f23294..4a0fb9e 100644
--- a/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonVectorProxy.java
+++ b/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonVectorProxy.java
@@ -18,6 +18,8 @@ package org.apache.spark.sql;
import java.math.BigInteger;
+import org.apache.carbondata.core.scan.result.vector.CarbonDictionary;
+
import org.apache.spark.memory.MemoryMode;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.execution.vectorized.Dictionary;
@@ -262,9 +264,9 @@ public class CarbonVectorProxy {
return columnVectors[ordinal].hasDictionary();
}
- public void setDictionary(Object dictionary, int ordinal) {
- if (dictionary instanceof Dictionary) {
- columnVectors[ordinal].setDictionary((Dictionary) dictionary);
+ public void setDictionary(CarbonDictionary dictionary, int ordinal) {
+ if (null != dictionary) {
+ columnVectors[ordinal].setDictionary(new CarbonDictionaryWrapper(dictionary));
} else {
columnVectors[ordinal].setDictionary(null);
}