You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/06/22 01:34:20 UTC

[19/50] [abbrv] carbondata git commit: [HOTFIX][CARBONDATA-2591] Fix SDK CarbonReader filter issue

[HOTFIX][CARBONDATA-2591] Fix SDK CarbonReader filter issue

There are some issue in SDK CarbonReader filter function, please check the lira.

This closes #2363


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/290ef5a3
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/290ef5a3
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/290ef5a3

Branch: refs/heads/carbonstore
Commit: 290ef5a3a90081b3c95ea0dc418f643ea5ad694f
Parents: 0ef7e55
Author: xubo245 <xu...@huawei.com>
Authored: Thu Jun 7 22:00:31 2018 +0800
Committer: ravipesala <ra...@gmail.com>
Committed: Tue Jun 12 10:54:04 2018 +0530

----------------------------------------------------------------------
 .../core/metadata/schema/table/CarbonTable.java |  35 +++
 .../apache/carbondata/core/util/CarbonUtil.java |   1 +
 .../sdk/file/CarbonReaderBuilder.java           |   6 +-
 .../carbondata/sdk/file/CarbonReaderTest.java   | 251 ++++++++++++++++++-
 .../apache/carbondata/sdk/file/TestUtil.java    |  14 ++
 5 files changed, 304 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/290ef5a3/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index 6949643..20bc7a1 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -17,6 +17,8 @@
 
 package org.apache.carbondata.core.metadata.schema.table;
 
+import java.io.File;
+import java.io.FilenameFilter;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -57,6 +59,8 @@ import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 
+import static org.apache.carbondata.core.util.CarbonUtil.thriftColumnSchemaToWrapperColumnSchema;
+
 /**
  * Mapping class for Carbon actual table
  */
@@ -218,6 +222,37 @@ public class CarbonTable implements Serializable {
     }
   }
 
+  public static CarbonTable buildTable(
+      String tablePath,
+      String tableName) throws IOException {
+    TableInfo tableInfoInfer = CarbonUtil.buildDummyTableInfo(tablePath, "null", "null");
+    File[] dataFiles = new File(tablePath).listFiles(new FilenameFilter() {
+      @Override
+      public boolean accept(File dir, String name) {
+        if (name == null) {
+          return false;
+        }
+        return name.endsWith("carbonindex");
+      }
+    });
+    if (dataFiles == null || dataFiles.length < 1) {
+      throw new RuntimeException("Carbon index file not exists.");
+    }
+    org.apache.carbondata.format.TableInfo tableInfo = CarbonUtil
+        .inferSchemaFromIndexFile(dataFiles[0].toString(), tableName);
+    List<ColumnSchema> columnSchemaList = new ArrayList<ColumnSchema>();
+    for (org.apache.carbondata.format.ColumnSchema thriftColumnSchema : tableInfo
+        .getFact_table().getTable_columns()) {
+      ColumnSchema columnSchema = thriftColumnSchemaToWrapperColumnSchema(thriftColumnSchema);
+      if (columnSchema.getColumnReferenceId() == null) {
+        columnSchema.setColumnReferenceId(columnSchema.getColumnUniqueId());
+      }
+      columnSchemaList.add(columnSchema);
+    }
+    tableInfoInfer.getFactTable().setListOfColumns(columnSchemaList);
+    return CarbonTable.buildFromTableInfo(tableInfoInfer);
+  }
+
   public static CarbonTable buildDummyTable(String tablePath) throws IOException {
     TableInfo tableInfoInfer = CarbonUtil.buildDummyTableInfo(tablePath, "null", "null");
     return CarbonTable.buildFromTableInfo(tableInfoInfer);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/290ef5a3/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index e1e5e16..2aa4a05 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -2209,6 +2209,7 @@ public final class CarbonUtil {
       org.apache.carbondata.format.ColumnSchema externalColumnSchema) {
     ColumnSchema wrapperColumnSchema = new ColumnSchema();
     wrapperColumnSchema.setColumnUniqueId(externalColumnSchema.getColumn_id());
+    wrapperColumnSchema.setColumnReferenceId(externalColumnSchema.getColumnReferenceId());
     wrapperColumnSchema.setColumnName(externalColumnSchema.getColumn_name());
     wrapperColumnSchema.setColumnar(externalColumnSchema.isColumnar());
     DataType dataType = thriftDataTyopeToWrapperDataType(externalColumnSchema.data_type);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/290ef5a3/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
index 98aa6e0..83cb34e 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
@@ -179,7 +179,11 @@ public class CarbonReaderBuilder {
     if (isTransactionalTable) {
       table = CarbonTable.buildFromTablePath(tableName, "default", tablePath);
     } else {
-      table = CarbonTable.buildDummyTable(tablePath);
+      if (filterExpression != null) {
+        table = CarbonTable.buildTable(tablePath, tableName);
+      } else {
+        table = CarbonTable.buildDummyTable(tablePath);
+      }
     }
     final CarbonFileInputFormat format = new CarbonFileInputFormat();
     final Job job = new Job(new Configuration());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/290ef5a3/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
index a8aa795..fb2e2bc 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
@@ -20,8 +20,7 @@ package org.apache.carbondata.sdk.file;
 import java.io.*;
 import java.sql.Date;
 import java.sql.Timestamp;
-import java.util.Arrays;
-import java.util.Comparator;
+import java.util.*;
 
 import org.apache.avro.generic.GenericData;
 import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
@@ -29,6 +28,11 @@ import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.scan.expression.ColumnExpression;
+import org.apache.carbondata.core.scan.expression.LiteralExpression;
+import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression;
+import org.apache.carbondata.core.scan.expression.logical.AndExpression;
+import org.apache.carbondata.core.scan.expression.logical.OrExpression;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 
@@ -48,6 +52,12 @@ public class CarbonReaderTest extends TestCase {
   @After
   public void verifyDMFile() {
     assert (!TestUtil.verifyMdtFile());
+    String path = "./testWriteFiles";
+    try {
+      FileUtils.deleteDirectory(new File(path));
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
   }
 
   @Test
@@ -106,6 +116,243 @@ public class CarbonReaderTest extends TestCase {
   }
 
   @Test
+  public void testReadWithFilterOfTransactional() throws IOException, InterruptedException {
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+
+    Field[] fields = new Field[2];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+
+    TestUtil.writeFilesAndVerify(200, new Schema(fields), path, true);
+
+    EqualToExpression equalToExpression = new EqualToExpression(
+        new ColumnExpression("name", DataTypes.STRING),
+        new LiteralExpression("robot1", DataTypes.STRING));
+    CarbonReader reader = CarbonReader
+        .builder(path, "_temp")
+        .isTransactionalTable(true)
+        .projection(new String[]{"name", "age"})
+        .filter(equalToExpression)
+        .build();
+
+    int i = 0;
+    while (reader.hasNext()) {
+      Object[] row = (Object[]) reader.readNextRow();
+      // Default sort column is applied for dimensions. So, need  to validate accordingly
+      assert ("robot1".equals(row[0]));
+      i++;
+    }
+    Assert.assertEquals(i, 20);
+
+    reader.close();
+
+    FileUtils.deleteDirectory(new File(path));
+  }
+
+  @Test
+  public void testReadWithFilterOfTransactionalAnd() throws IOException, InterruptedException {
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+
+    Field[] fields = new Field[3];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+    fields[2] = new Field("doubleField", DataTypes.DOUBLE);
+
+    TestUtil.writeFilesAndVerify(200, new Schema(fields), path, true);
+
+    ColumnExpression columnExpression = new ColumnExpression("doubleField", DataTypes.DOUBLE);
+    EqualToExpression equalToExpression = new EqualToExpression(columnExpression,
+        new LiteralExpression("3.5", DataTypes.DOUBLE));
+
+    ColumnExpression columnExpression2 = new ColumnExpression("name", DataTypes.STRING);
+    EqualToExpression equalToExpression2 = new EqualToExpression(columnExpression2,
+        new LiteralExpression("robot7", DataTypes.STRING));
+
+    AndExpression andExpression = new AndExpression(equalToExpression, equalToExpression2);
+    CarbonReader reader = CarbonReader
+        .builder(path, "_temp")
+        .isTransactionalTable(true)
+        .projection(new String[]{"name", "age", "doubleField"})
+        .filter(andExpression)
+        .build();
+
+    int i = 0;
+    while (reader.hasNext()) {
+      Object[] row = (Object[]) reader.readNextRow();
+      assert (((String) row[0]).contains("robot7"));
+      assert (7 == (int) (row[1]));
+      assert (3.5 == (double) (row[2]));
+      i++;
+    }
+    Assert.assertEquals(i, 1);
+
+    reader.close();
+
+    FileUtils.deleteDirectory(new File(path));
+  }
+
+  @Test
+  public void testReadWithFilterOfNonTransactionalSimple() throws IOException, InterruptedException {
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+
+    Field[] fields = new Field[2];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+
+    TestUtil.writeFilesAndVerify(200, new Schema(fields), path, false, false);
+
+    ColumnExpression columnExpression = new ColumnExpression("name", DataTypes.STRING);
+    EqualToExpression equalToExpression = new EqualToExpression(columnExpression,
+        new LiteralExpression("robot1", DataTypes.STRING));
+
+    CarbonReader reader = CarbonReader
+        .builder(path, "_temp")
+        .isTransactionalTable(false)
+        .projection(new String[]{"name", "age"})
+        .filter(equalToExpression)
+        .build();
+
+    int i = 0;
+    while (reader.hasNext()) {
+      Object[] row = (Object[]) reader.readNextRow();
+      // Default sort column is applied for dimensions. So, need  to validate accordingly
+      assert ("robot1".equals(row[0]));
+      i++;
+    }
+    Assert.assertEquals(i, 20);
+
+    reader.close();
+
+    FileUtils.deleteDirectory(new File(path));
+  }
+
+  @Test
+  public void testReadWithFilterOfNonTransactional2() throws IOException, InterruptedException {
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+
+    Field[] fields = new Field[2];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+
+    TestUtil.writeFilesAndVerify(200, new Schema(fields), path, false, false);
+
+    ColumnExpression columnExpression = new ColumnExpression("age", DataTypes.INT);
+
+    EqualToExpression equalToExpression = new EqualToExpression(columnExpression,
+        new LiteralExpression("1", DataTypes.INT));
+    CarbonReader reader = CarbonReader
+        .builder(path, "_temp")
+        .isTransactionalTable(false)
+        .projection(new String[]{"name", "age"})
+        .filter(equalToExpression)
+        .build();
+
+    int i = 0;
+    while (reader.hasNext()) {
+      Object[] row = (Object[]) reader.readNextRow();
+      // Default sort column is applied for dimensions. So, need  to validate accordingly
+      assert (((String) row[0]).contains("robot"));
+      assert (1 == (int) (row[1]));
+      i++;
+    }
+    Assert.assertEquals(i, 1);
+
+    reader.close();
+
+    FileUtils.deleteDirectory(new File(path));
+  }
+
+  @Test
+  public void testReadWithFilterOfNonTransactionalAnd() throws IOException, InterruptedException {
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+
+    Field[] fields = new Field[3];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+    fields[2] = new Field("doubleField", DataTypes.DOUBLE);
+
+    TestUtil.writeFilesAndVerify(200, new Schema(fields), path, false, false);
+
+    ColumnExpression columnExpression = new ColumnExpression("doubleField", DataTypes.DOUBLE);
+    EqualToExpression equalToExpression = new EqualToExpression(columnExpression,
+        new LiteralExpression("3.5", DataTypes.DOUBLE));
+
+    ColumnExpression columnExpression2 = new ColumnExpression("name", DataTypes.STRING);
+    EqualToExpression equalToExpression2 = new EqualToExpression(columnExpression2,
+        new LiteralExpression("robot7", DataTypes.STRING));
+
+    AndExpression andExpression = new AndExpression(equalToExpression, equalToExpression2);
+    CarbonReader reader = CarbonReader
+        .builder(path, "_temp")
+        .isTransactionalTable(false)
+        .projection(new String[]{"name", "age", "doubleField"})
+        .filter(andExpression)
+        .build();
+
+    int i = 0;
+    while (reader.hasNext()) {
+      Object[] row = (Object[]) reader.readNextRow();
+      assert (((String) row[0]).contains("robot7"));
+      assert (7 == (int) (row[1]));
+      assert (3.5 == (double) (row[2]));
+      i++;
+    }
+    Assert.assertEquals(i, 1);
+
+    reader.close();
+
+    FileUtils.deleteDirectory(new File(path));
+  }
+
+  @Test
+  public void testReadWithFilterOfNonTransactionalOr() throws IOException, InterruptedException {
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+
+    Field[] fields = new Field[3];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+    fields[2] = new Field("doubleField", DataTypes.DOUBLE);
+
+    TestUtil.writeFilesAndVerify(200, new Schema(fields), path, false, false);
+
+    ColumnExpression columnExpression = new ColumnExpression("doubleField", DataTypes.DOUBLE);
+    EqualToExpression equalToExpression = new EqualToExpression(columnExpression,
+        new LiteralExpression("3.5", DataTypes.DOUBLE));
+
+    ColumnExpression columnExpression2 = new ColumnExpression("name", DataTypes.STRING);
+    EqualToExpression equalToExpression2 = new EqualToExpression(columnExpression2,
+        new LiteralExpression("robot7", DataTypes.STRING));
+
+    OrExpression andExpression = new OrExpression(equalToExpression, equalToExpression2);
+    CarbonReader reader = CarbonReader
+        .builder(path, "_temp")
+        .isTransactionalTable(false)
+        .projection(new String[]{"name", "age", "doubleField"})
+        .filter(andExpression)
+        .build();
+
+    int i = 0;
+    while (reader.hasNext()) {
+      Object[] row = (Object[]) reader.readNextRow();
+      assert (((String) row[0]).contains("robot7"));
+      assert (7 == ((int) (row[1]) % 10));
+      assert (0.5 == ((double) (row[2]) % 1));
+      i++;
+    }
+    Assert.assertEquals(i, 20);
+
+    reader.close();
+
+    FileUtils.deleteDirectory(new File(path));
+  }
+
+  @Test
   public void testReadColumnTwice() throws IOException, InterruptedException {
     String path = "./testWriteFiles";
     FileUtils.deleteDirectory(new File(path));

http://git-wip-us.apache.org/repos/asf/carbondata/blob/290ef5a3/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java
index 0f00d61..919472c 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java
@@ -49,6 +49,20 @@ public class TestUtil {
   }
 
   /**
+   * write file and verify
+   *
+   * @param rows                 number of rows
+   * @param schema               schema
+   * @param path                 table store path
+   * @param persistSchema        whether persist schema
+   * @param isTransactionalTable whether is transactional table
+   */
+  public static void writeFilesAndVerify(int rows, Schema schema, String path, boolean persistSchema,
+    boolean isTransactionalTable) {
+    writeFilesAndVerify(rows, schema, path, null, persistSchema, -1, -1, isTransactionalTable);
+  }
+
+  /**
    * Invoke CarbonWriter API to write carbon files and assert the file is rewritten
    * @param rows number of rows to write
    * @param schema schema of the file