You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/06/30 07:45:17 UTC

carbondata git commit: [CARBONDATA-2629] Support SDK carbon reader read data from HDFS and S3 with filter function

Repository: carbondata
Updated Branches:
  refs/heads/master e30a84cc5 -> 459331c32


[CARBONDATA-2629] Support SDK carbon reader read data from HDFS and S3 with filter function

Now SDK carbon reader only support read data from local with filter function, it will throw exception when read data from HDFS and S3 with filter function
This PR support it:
Support SDK carbon reader read data from HDFS and S3 with filter function

This closes #2399


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/459331c3
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/459331c3
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/459331c3

Branch: refs/heads/master
Commit: 459331c322d2a1a15628c8d55ff13fc62a760573
Parents: e30a84c
Author: xubo245 <xu...@huawei.com>
Authored: Fri Jun 22 19:51:50 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Sat Jun 30 15:45:05 2018 +0800

----------------------------------------------------------------------
 .../core/metadata/schema/table/CarbonTable.java |  29 +--
 .../carbondata/examples/sdk/SDKS3Example.java   |  21 +-
 .../carbondata/sdk/file/CarbonReaderTest.java   | 221 ++++++++++++++++++-
 3 files changed, 249 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/459331c3/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index 710c7af..6f6ef81 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -17,8 +17,6 @@
 
 package org.apache.carbondata.core.metadata.schema.table;
 
-import java.io.File;
-import java.io.FilenameFilter;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -36,6 +34,9 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datamap.DataMapStoreManager;
 import org.apache.carbondata.core.datamap.TableDataMap;
 import org.apache.carbondata.core.datamap.dev.DataMapFactory;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.features.TableOperation;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
@@ -236,20 +237,22 @@ public class CarbonTable implements Serializable {
       String tablePath,
       String tableName) throws IOException {
     TableInfo tableInfoInfer = CarbonUtil.buildDummyTableInfo(tablePath, "null", "null");
-    File[] dataFiles = new File(tablePath).listFiles(new FilenameFilter() {
-      @Override
-      public boolean accept(File dir, String name) {
-        if (name == null) {
-          return false;
-        }
-        return name.endsWith("carbonindex");
-      }
-    });
-    if (dataFiles == null || dataFiles.length < 1) {
+    CarbonFile[] carbonFiles = FileFactory
+        .getCarbonFile(tablePath)
+        .listFiles(new CarbonFileFilter() {
+          @Override
+          public boolean accept(CarbonFile file) {
+            if (file == null) {
+              return false;
+            }
+            return file.getName().endsWith("carbonindex");
+          }
+        });
+    if (carbonFiles == null || carbonFiles.length < 1) {
       throw new RuntimeException("Carbon index file not exists.");
     }
     org.apache.carbondata.format.TableInfo tableInfo = CarbonUtil
-        .inferSchemaFromIndexFile(dataFiles[0].toString(), tableName);
+        .inferSchemaFromIndexFile(carbonFiles[0].getPath(), tableName);
     List<ColumnSchema> columnSchemaList = new ArrayList<ColumnSchema>();
     for (org.apache.carbondata.format.ColumnSchema thriftColumnSchema : tableInfo
         .getFact_table().getTable_columns()) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/459331c3/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java
index 80c56fc..52d51b5 100644
--- a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java
+++ b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java
@@ -20,6 +20,9 @@ package org.apache.carbondata.examples.sdk;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.scan.expression.ColumnExpression;
+import org.apache.carbondata.core.scan.expression.LiteralExpression;
+import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression;
 import org.apache.carbondata.sdk.file.*;
 
 /**
@@ -60,13 +63,19 @@ public class SDKS3Example {
         }
         writer.close();
         // Read data
+
+        EqualToExpression equalToExpression = new EqualToExpression(
+            new ColumnExpression("name", DataTypes.STRING),
+            new LiteralExpression("robot1", DataTypes.STRING));
+
         CarbonReader reader = CarbonReader
-                .builder(path, "_temp")
-                .projection(new String[]{"name", "age"})
-                .setAccessKey(args[0])
-                .setSecretKey(args[1])
-                .setEndPoint(args[2])
-                .build();
+            .builder(path, "_temp")
+            .projection(new String[]{"name", "age"})
+            .filter(equalToExpression)
+            .setAccessKey(args[0])
+            .setSecretKey(args[1])
+            .setEndPoint(args[2])
+            .build();
 
         System.out.println("\nData:");
         int i = 0;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/459331c3/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
index e4748e3..19bf194 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
@@ -30,7 +30,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.scan.expression.ColumnExpression;
 import org.apache.carbondata.core.scan.expression.LiteralExpression;
-import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression;
+import org.apache.carbondata.core.scan.expression.conditional.*;
 import org.apache.carbondata.core.scan.expression.logical.AndExpression;
 import org.apache.carbondata.core.scan.expression.logical.OrExpression;
 import org.apache.carbondata.core.util.CarbonProperties;
@@ -328,12 +328,12 @@ public class CarbonReaderTest extends TestCase {
     EqualToExpression equalToExpression2 = new EqualToExpression(columnExpression2,
         new LiteralExpression("robot7", DataTypes.STRING));
 
-    OrExpression andExpression = new OrExpression(equalToExpression, equalToExpression2);
+    OrExpression orExpression = new OrExpression(equalToExpression, equalToExpression2);
     CarbonReader reader = CarbonReader
         .builder(path, "_temp")
         .isTransactionalTable(false)
         .projection(new String[]{"name", "age", "doubleField"})
-        .filter(andExpression)
+        .filter(orExpression)
         .build();
 
     int i = 0;
@@ -352,6 +352,221 @@ public class CarbonReaderTest extends TestCase {
   }
 
   @Test
+  public void testReadWithFilterOfNonTransactionalGreaterThan() throws IOException, InterruptedException {
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+
+    Field[] fields = new Field[3];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+    fields[2] = new Field("doubleField", DataTypes.DOUBLE);
+
+    TestUtil.writeFilesAndVerify(200, new Schema(fields), path, false, false);
+
+    ColumnExpression columnExpression = new ColumnExpression("doubleField", DataTypes.DOUBLE);
+    GreaterThanExpression greaterThanExpression = new GreaterThanExpression(columnExpression,
+        new LiteralExpression("13.5", DataTypes.DOUBLE));
+
+    ColumnExpression columnExpression2 = new ColumnExpression("name", DataTypes.STRING);
+    EqualToExpression equalToExpression2 = new EqualToExpression(columnExpression2,
+        new LiteralExpression("robot7", DataTypes.STRING));
+
+    AndExpression andExpression = new AndExpression(greaterThanExpression, equalToExpression2);
+    CarbonReader reader = CarbonReader
+        .builder(path, "_temp")
+        .isTransactionalTable(false)
+        .projection(new String[]{"name", "age", "doubleField"})
+        .filter(andExpression)
+        .build();
+
+    int i = 0;
+    while (reader.hasNext()) {
+      Object[] row = (Object[]) reader.readNextRow();
+      assert (((String) row[0]).contains("robot7"));
+      assert (7 == ((int) (row[1]) % 10));
+      assert ((double) row[2] > 13.5);
+      i++;
+    }
+    Assert.assertEquals(i, 17);
+
+    reader.close();
+
+    FileUtils.deleteDirectory(new File(path));
+  }
+
+  @Test
+  public void testReadWithFilterOfNonTransactionalLessThan() throws IOException, InterruptedException {
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+
+    Field[] fields = new Field[3];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+    fields[2] = new Field("doubleField", DataTypes.DOUBLE);
+
+    TestUtil.writeFilesAndVerify(200, new Schema(fields), path, false, false);
+
+    ColumnExpression columnExpression = new ColumnExpression("doubleField", DataTypes.DOUBLE);
+    LessThanExpression lessThanExpression = new LessThanExpression(columnExpression,
+        new LiteralExpression("13.5", DataTypes.DOUBLE));
+
+    ColumnExpression columnExpression2 = new ColumnExpression("name", DataTypes.STRING);
+    EqualToExpression equalToExpression2 = new EqualToExpression(columnExpression2,
+        new LiteralExpression("robot7", DataTypes.STRING));
+
+    AndExpression andExpression = new AndExpression(lessThanExpression, equalToExpression2);
+    CarbonReader reader = CarbonReader
+        .builder(path, "_temp")
+        .isTransactionalTable(false)
+        .projection(new String[]{"name", "age", "doubleField"})
+        .filter(andExpression)
+        .build();
+
+    int i = 0;
+    while (reader.hasNext()) {
+      Object[] row = (Object[]) reader.readNextRow();
+      assert (((String) row[0]).contains("robot7"));
+      assert (7 == ((int) (row[1]) % 10));
+      assert ((double) row[2] < 13.5);
+      i++;
+    }
+    Assert.assertEquals(i, 2);
+
+    reader.close();
+
+    FileUtils.deleteDirectory(new File(path));
+  }
+
+  @Test
+  public void testReadWithFilterOfNonTransactionalNotEqual() throws IOException, InterruptedException {
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+
+    Field[] fields = new Field[3];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+    fields[2] = new Field("doubleField", DataTypes.DOUBLE);
+
+    TestUtil.writeFilesAndVerify(200, new Schema(fields), path, false, false);
+
+    ColumnExpression columnExpression = new ColumnExpression("doubleField", DataTypes.DOUBLE);
+    LessThanExpression lessThanExpression = new LessThanExpression(columnExpression,
+        new LiteralExpression("13.5", DataTypes.DOUBLE));
+
+    ColumnExpression columnExpression2 = new ColumnExpression("name", DataTypes.STRING);
+    NotEqualsExpression notEqualsExpression = new NotEqualsExpression(columnExpression2,
+        new LiteralExpression("robot7", DataTypes.STRING));
+
+    AndExpression andExpression = new AndExpression(lessThanExpression, notEqualsExpression);
+    CarbonReader reader = CarbonReader
+        .builder(path, "_temp")
+        .isTransactionalTable(false)
+        .projection(new String[]{"name", "age", "doubleField"})
+        .filter(andExpression)
+        .build();
+
+    int i = 0;
+    while (reader.hasNext()) {
+      Object[] row = (Object[]) reader.readNextRow();
+      assert (!((String) row[0]).contains("robot7"));
+      assert (7 != ((int) (row[1]) % 10));
+      assert ((double) row[2] < 13.5);
+      i++;
+    }
+    Assert.assertEquals(i, 25);
+
+    reader.close();
+
+    FileUtils.deleteDirectory(new File(path));
+  }
+
+  @Test
+  public void testReadWithFilterOfNonTransactionalIn() throws IOException, InterruptedException {
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+
+    Field[] fields = new Field[3];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+    fields[2] = new Field("doubleField", DataTypes.DOUBLE);
+
+    TestUtil.writeFilesAndVerify(200, new Schema(fields), path, false, false);
+
+    ColumnExpression columnExpression = new ColumnExpression("doubleField", DataTypes.DOUBLE);
+    LessThanExpression lessThanExpression = new LessThanExpression(columnExpression,
+        new LiteralExpression("13.5", DataTypes.DOUBLE));
+
+    ColumnExpression columnExpression2 = new ColumnExpression("name", DataTypes.STRING);
+    InExpression inExpression = new InExpression(columnExpression2,
+        new LiteralExpression("robot7", DataTypes.STRING));
+
+    AndExpression andExpression = new AndExpression(lessThanExpression, inExpression);
+    CarbonReader reader = CarbonReader
+        .builder(path, "_temp")
+        .isTransactionalTable(false)
+        .projection(new String[]{"name", "age", "doubleField"})
+        .filter(andExpression)
+        .build();
+
+    int i = 0;
+    while (reader.hasNext()) {
+      Object[] row = (Object[]) reader.readNextRow();
+      assert (((String) row[0]).contains("robot7"));
+      assert (7 == ((int) (row[1]) % 10));
+      assert ((double) row[2] < 13.5);
+      i++;
+    }
+    Assert.assertEquals(i, 2);
+
+    reader.close();
+
+    FileUtils.deleteDirectory(new File(path));
+  }
+
+  @Test
+  public void testReadWithFilterOfNonTransactionalNotIn() throws IOException, InterruptedException {
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+
+    Field[] fields = new Field[3];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+    fields[2] = new Field("doubleField", DataTypes.DOUBLE);
+
+    TestUtil.writeFilesAndVerify(200, new Schema(fields), path, false, false);
+
+    ColumnExpression columnExpression = new ColumnExpression("doubleField", DataTypes.DOUBLE);
+    LessThanExpression lessThanExpression = new LessThanExpression(columnExpression,
+        new LiteralExpression("13.5", DataTypes.DOUBLE));
+
+    ColumnExpression columnExpression2 = new ColumnExpression("name", DataTypes.STRING);
+    NotInExpression notInExpression = new NotInExpression(columnExpression2,
+        new LiteralExpression("robot7", DataTypes.STRING));
+
+    AndExpression andExpression = new AndExpression(lessThanExpression, notInExpression);
+    CarbonReader reader = CarbonReader
+        .builder(path, "_temp")
+        .isTransactionalTable(false)
+        .projection(new String[]{"name", "age", "doubleField"})
+        .filter(andExpression)
+        .build();
+
+    int i = 0;
+    while (reader.hasNext()) {
+      Object[] row = (Object[]) reader.readNextRow();
+      assert (!((String) row[0]).contains("robot7"));
+      assert (7 != ((int) (row[1]) % 10));
+      assert ((double) row[2] < 13.5);
+      i++;
+    }
+    Assert.assertEquals(i, 25);
+
+    reader.close();
+
+    FileUtils.deleteDirectory(new File(path));
+  }
+
+  @Test
   public void testWriteAndReadFilesWithReaderBuildFail() throws IOException, InterruptedException {
     String path1 = "./testWriteFiles";
     String path2 = "./testWriteFiles2";