You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2019/01/08 07:29:04 UTC

carbondata git commit: [CARBONDATA-3221] Fix the error of SDK don't support read multiple file from S3

Repository: carbondata
Updated Branches:
  refs/heads/master 45951c763 -> c0ba982e6


[CARBONDATA-3221] Fix the error of SDK don't support read multiple file from S3

SDK reader is ok with filter, but when we read data without filter, the ((CarbonInputSplit) inputSplit).getDetailInfo().getBlockFooterOffset() will be 0 and
FileReader reader don't closed after readByteBuffer in org.apache.carbondata.hadoop.util.CarbonVectorizedRecordReader#initialize, so we should invoke finish after readByteBuffer

This closes #3051


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/c0ba982e
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/c0ba982e
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/c0ba982e

Branch: refs/heads/master
Commit: c0ba982e61ba5393168f8664b87ee27cd249d1ca
Parents: 45951c7
Author: xubo245 <xu...@huawei.com>
Authored: Fri Jan 4 16:53:48 2019 +0800
Committer: ravipesala <ra...@gmail.com>
Committed: Tue Jan 8 12:58:56 2019 +0530

----------------------------------------------------------------------
 .../carbondata/examples/sdk/SDKS3Example.java   | 55 +++++++++++++++-----
 .../util/CarbonVectorizedRecordReader.java      |  1 +
 2 files changed, 43 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/c0ba982e/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java
index f9eae9e..33642bf 100644
--- a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java
+++ b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java
@@ -33,6 +33,10 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.s3a.Constants;
 import org.apache.log4j.Logger;
 
+import static org.apache.hadoop.fs.s3a.Constants.ACCESS_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT;
+import static org.apache.hadoop.fs.s3a.Constants.SECRET_KEY;
+
 /**
  * Example for testing CarbonWriter on S3
  */
@@ -41,7 +45,7 @@ public class SDKS3Example {
         Logger logger = LogServiceFactory.getLogService(SDKS3Example.class.getName());
         if (args == null || args.length < 3) {
             logger.error("Usage: java CarbonS3Example: <access-key> <secret-key>"
-                + "<s3-endpoint> [table-path-on-s3] [rows]");
+                + "<s3-endpoint> [table-path-on-s3] [rows] [Number of writes]");
             System.exit(0);
         }
 
@@ -56,9 +60,13 @@ public class SDKS3Example {
             path=args[3];
         }
 
-        int num = 3;
+        int rows = 3;
         if (args.length > 4) {
-            num = Integer.parseInt(args[4]);
+            rows = Integer.parseInt(args[4]);
+        }
+        int num = 3;
+        if (args.length > 5) {
+            num = Integer.parseInt(args[5]);
         }
 
         Configuration conf = new Configuration(true);
@@ -69,18 +77,20 @@ public class SDKS3Example {
         Field[] fields = new Field[2];
         fields[0] = new Field("name", DataTypes.STRING);
         fields[1] = new Field("age", DataTypes.INT);
-        CarbonWriter writer = CarbonWriter
-            .builder()
-            .outputPath(path)
-            .withHadoopConf(conf)
-            .withCsvInput(new Schema(fields))
-            .writtenBy("SDKS3Example")
-            .build();
+        for (int j = 0; j < num; j++) {
+            CarbonWriter writer = CarbonWriter
+                .builder()
+                .outputPath(path)
+                .withHadoopConf(conf)
+                .withCsvInput(new Schema(fields))
+                .writtenBy("SDKS3Example")
+                .build();
 
-        for (int i = 0; i < num; i++) {
-            writer.write(new String[]{"robot" + (i % 10), String.valueOf(i)});
+            for (int i = 0; i < rows; i++) {
+                writer.write(new String[]{"robot" + (i % 10), String.valueOf(i)});
+            }
+            writer.close();
         }
-        writer.close();
         // Read data
 
         EqualToExpression equalToExpression = new EqualToExpression(
@@ -104,6 +114,25 @@ public class SDKS3Example {
         System.out.println("\nFinished");
         reader.close();
 
+        // Read without filter
+        CarbonReader reader2 = CarbonReader
+            .builder(path, "_temp")
+            .projection(new String[]{"name", "age"})
+            .withHadoopConf(ACCESS_KEY, args[0])
+            .withHadoopConf(SECRET_KEY, args[1])
+            .withHadoopConf(ENDPOINT, args[2])
+            .build();
+
+        System.out.println("\nData:");
+        i = 0;
+        while (i < 20 && reader2.hasNext()) {
+            Object[] row = (Object[]) reader2.readNextRow();
+            System.out.println(row[0] + " " + row[1]);
+            i++;
+        }
+        System.out.println("\nFinished");
+        reader2.close();
+
         CarbonProperties.getInstance()
             .addProperty(CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH,
                 backupProperty);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c0ba982e/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonVectorizedRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonVectorizedRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonVectorizedRecordReader.java
index d66bdd1..e18a4d4 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonVectorizedRecordReader.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonVectorizedRecordReader.java
@@ -90,6 +90,7 @@ public class CarbonVectorizedRecordReader extends AbstractRecordReader<Object> {
                 ((CarbonInputSplit) inputSplit).getDetailInfo().getBlockSize() - 8,
                 8);
         ((CarbonInputSplit) inputSplit).getDetailInfo().setBlockFooterOffset(buffer.getLong());
+        reader.finish();
       }
       splitList = new ArrayList<>(1);
       splitList.add((CarbonInputSplit) inputSplit);