You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/09/26 07:02:03 UTC

carbondata git commit: [CARBONDATA-2969]local dictioanry query fix for spark-2.3

Repository: carbondata
Updated Branches:
  refs/heads/master f2398948c -> 2ab2254be


[CARBONDATA-2969]local dictioanry query fix for spark-2.3

This closes #2761


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/2ab2254b
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/2ab2254b
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/2ab2254b

Branch: refs/heads/master
Commit: 2ab2254be84f82fd2f4b99a6b73353f4c7a55d10
Parents: f239894
Author: akashrn5 <ak...@gmail.com>
Authored: Tue Sep 25 20:43:06 2018 +0530
Committer: Jacky Li <ja...@qq.com>
Committed: Wed Sep 26 15:01:38 2018 +0800

----------------------------------------------------------------------
 .../LocalDictionarySupportLoadTableTest.scala   | 14 +++++
 .../vectorreader/CarbonDictionaryWrapper.java   | 44 ---------------
 .../vectorreader/ColumnarVectorWrapper.java     | 11 +---
 .../spark/sql/CarbonDictionaryWrapper.java      | 44 +++++++++++++++
 .../org/apache/spark/sql/CarbonVectorProxy.java | 10 ++--
 .../spark/sql/CarbonDictionaryWrapper.java      | 56 ++++++++++++++++++++
 .../org/apache/spark/sql/CarbonVectorProxy.java |  8 +--
 7 files changed, 127 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/2ab2254b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/localdictionary/LocalDictionarySupportLoadTableTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/localdictionary/LocalDictionarySupportLoadTableTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/localdictionary/LocalDictionarySupportLoadTableTest.scala
index e88d8a9..d23c844 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/localdictionary/LocalDictionarySupportLoadTableTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/localdictionary/LocalDictionarySupportLoadTableTest.scala
@@ -136,6 +136,20 @@ class LocalDictionarySupportLoadTableTest extends QueryTest with BeforeAndAfterA
     assert(checkForLocalDictionary(getDimRawChunk(2)))
   }
 
+  test("test local dictionary data validation") {
+    sql("drop table if exists local_query_enable")
+    sql("drop table if exists local_query_disable")
+    sql(
+      "CREATE TABLE local_query_enable(name string) STORED BY 'carbondata' tblproperties" +
+      "('local_dictionary_enable'='false','local_dictionary_include'='name')")
+    sql("load data inpath '" + file1 + "' into table local_query_enable OPTIONS('header'='false')")
+    sql(
+      "CREATE TABLE local_query_disable(name string) STORED BY 'carbondata' tblproperties" +
+      "('local_dictionary_enable'='true','local_dictionary_include'='name')")
+    sql("load data inpath '" + file1 + "' into table local_query_disable OPTIONS('header'='false')")
+    checkAnswer(sql("select name from local_query_enable"), sql("select name from local_query_disable"))
+  }
+
   test("test to validate local dictionary values"){
     sql("drop table if exists local2")
     sql("CREATE TABLE local2(name string) STORED BY 'carbondata' tblproperties('local_dictionary_enable'='true')")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2ab2254b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/CarbonDictionaryWrapper.java
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/CarbonDictionaryWrapper.java b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/CarbonDictionaryWrapper.java
deleted file mode 100644
index 7f1e577..0000000
--- a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/CarbonDictionaryWrapper.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.spark.vectorreader;
-
-import org.apache.carbondata.core.scan.result.vector.CarbonDictionary;
-
-import org.apache.parquet.column.Dictionary;
-import org.apache.parquet.column.Encoding;
-import org.apache.parquet.io.api.Binary;
-
-public class CarbonDictionaryWrapper extends Dictionary {
-
-  private Binary[] binaries;
-
-  CarbonDictionaryWrapper(Encoding encoding, CarbonDictionary dictionary) {
-    super(encoding);
-    binaries = new Binary[dictionary.getDictionarySize()];
-    for (int i = 0; i < binaries.length; i++) {
-      binaries[i] = Binary.fromReusedByteArray(dictionary.getDictionaryValue(i));
-    }
-  }
-
-  @Override public int getMaxId() {
-    return binaries.length - 1;
-  }
-
-  @Override public Binary decodeToBinary(int id) {
-    return binaries[id];
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2ab2254b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
index a0938da..6acf31f 100644
--- a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
+++ b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
@@ -269,16 +269,7 @@ class ColumnarVectorWrapper implements CarbonColumnVector {
   }
 
   @Override public void setDictionary(CarbonDictionary dictionary) {
-    if (dictionary == null) {
-      sparkColumnVectorProxy.setDictionary(null, ordinal);
-    } else {
-      sparkColumnVectorProxy
-          .setDictionary(new CarbonDictionaryWrapper(Encoding.PLAIN, dictionary),ordinal);
-    }
-  }
-
-  private void  setDictionaryType(boolean type) {
-    this.isDictionary = type;
+      sparkColumnVectorProxy.setDictionary(dictionary, ordinal);
   }
 
   @Override public boolean hasDictionary() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2ab2254b/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonDictionaryWrapper.java
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonDictionaryWrapper.java b/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonDictionaryWrapper.java
new file mode 100644
index 0000000..b7c6741
--- /dev/null
+++ b/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonDictionaryWrapper.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql;
+
+import org.apache.carbondata.core.scan.result.vector.CarbonDictionary;
+
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.io.api.Binary;
+
+public class CarbonDictionaryWrapper extends Dictionary {
+
+  private Binary[] binaries;
+
+  CarbonDictionaryWrapper(Encoding encoding, CarbonDictionary dictionary) {
+    super(encoding);
+    binaries = new Binary[dictionary.getDictionarySize()];
+    for (int i = 0; i < binaries.length; i++) {
+      binaries[i] = Binary.fromReusedByteArray(dictionary.getDictionaryValue(i));
+    }
+  }
+
+  @Override public int getMaxId() {
+    return binaries.length - 1;
+  }
+
+  @Override public Binary decodeToBinary(int id) {
+    return binaries[id];
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2ab2254b/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonVectorProxy.java
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonVectorProxy.java b/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonVectorProxy.java
index f39bc93..80e6dbd 100644
--- a/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonVectorProxy.java
+++ b/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonVectorProxy.java
@@ -18,7 +18,10 @@ package org.apache.spark.sql;
 
 import java.math.BigInteger;
 
+import org.apache.carbondata.core.scan.result.vector.CarbonDictionary;
+
 import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.column.Encoding;
 import org.apache.spark.memory.MemoryMode;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
@@ -82,9 +85,10 @@ public class CarbonVectorProxy {
         return columnarBatch.capacity();
     }
 
-    public void setDictionary(Object dictionary, int ordinal) {
-        if (dictionary instanceof Dictionary) {
-            columnarBatch.column(ordinal).setDictionary((Dictionary) dictionary);
+    public void setDictionary(CarbonDictionary dictionary, int ordinal) {
+        if (null != dictionary) {
+            columnarBatch.column(ordinal)
+                .setDictionary(new CarbonDictionaryWrapper(Encoding.PLAIN, dictionary));
         } else {
             columnarBatch.column(ordinal).setDictionary(null);
         }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2ab2254b/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonDictionaryWrapper.java
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonDictionaryWrapper.java b/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonDictionaryWrapper.java
new file mode 100644
index 0000000..5a99c68
--- /dev/null
+++ b/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonDictionaryWrapper.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql;
+
+import org.apache.carbondata.core.scan.result.vector.CarbonDictionary;
+
+import org.apache.spark.sql.execution.vectorized.Dictionary;
+
+public class CarbonDictionaryWrapper implements Dictionary {
+
+  /**
+   * dictionary values
+   */
+  private byte[][] binaries;
+
+  CarbonDictionaryWrapper(CarbonDictionary dictionary) {
+    binaries = new byte[dictionary.getDictionarySize()][];
+    for (int i = 0; i < binaries.length; i++) {
+      binaries[i] = dictionary.getDictionaryValue(i);
+    }
+  }
+
+  @Override public int decodeToInt(int id) {
+    throw new UnsupportedOperationException("Dictionary encoding does not support int");
+  }
+
+  @Override public long decodeToLong(int id) {
+    throw new UnsupportedOperationException("Dictionary encoding does not support long");
+  }
+
+  @Override public float decodeToFloat(int id) {
+    throw new UnsupportedOperationException("Dictionary encoding does not support float");
+  }
+
+  @Override public double decodeToDouble(int id) {
+    throw new UnsupportedOperationException("Dictionary encoding does not support double");
+  }
+
+  @Override public byte[] decodeToBinary(int id) {
+    return binaries[id];
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2ab2254b/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonVectorProxy.java
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonVectorProxy.java b/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonVectorProxy.java
index 0f23294..4a0fb9e 100644
--- a/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonVectorProxy.java
+++ b/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonVectorProxy.java
@@ -18,6 +18,8 @@ package org.apache.spark.sql;
 
 import java.math.BigInteger;
 
+import org.apache.carbondata.core.scan.result.vector.CarbonDictionary;
+
 import org.apache.spark.memory.MemoryMode;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.execution.vectorized.Dictionary;
@@ -262,9 +264,9 @@ public class CarbonVectorProxy {
         return columnVectors[ordinal].hasDictionary();
     }
 
-    public void setDictionary(Object dictionary, int ordinal) {
-        if (dictionary instanceof Dictionary) {
-            columnVectors[ordinal].setDictionary((Dictionary) dictionary);
+    public void setDictionary(CarbonDictionary dictionary, int ordinal) {
+        if (null != dictionary) {
+            columnVectors[ordinal].setDictionary(new CarbonDictionaryWrapper(dictionary));
         } else {
             columnVectors[ordinal].setDictionary(null);
         }