You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/06/12 15:07:29 UTC

[2/2] carbondata git commit: [CARBONDATA-2603] Fix: error handling during reader build failure

[CARBONDATA-2603] Fix: error handling during reader build failure

problem :
When the CarbonReaderBuilder.build() is failed due to some problems like invalid
projection that leads to query model creation failure. Blocklet datamap is not cleared for that table.So,
the next reader instance uses old blocklet datamap . That creates error.

Solution: Clear the blocklet datamap if the reader build is failed.

This closes #2368


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/e07b832f
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/e07b832f
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/e07b832f

Branch: refs/heads/branch-1.4
Commit: e07b832f557d91e9663feb1457bc44cea5121966
Parents: 288aba1
Author: ajantha-bhat <aj...@gmail.com>
Authored: Mon Jun 11 19:17:33 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Tue Jun 12 20:36:59 2018 +0530

----------------------------------------------------------------------
 .../sdk/file/CarbonReaderBuilder.java           | 39 ++++++++-----
 .../carbondata/sdk/file/CarbonReaderTest.java   | 61 ++++++++++++++++++++
 2 files changed, 84 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/e07b832f/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
index 83cb34e..ebee41a 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
@@ -24,6 +24,7 @@ import java.util.Objects;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.core.datamap.DataMapStoreManager;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.scan.expression.Expression;
@@ -200,23 +201,29 @@ public class CarbonReaderBuilder {
       format.setColumnProjection(job.getConfiguration(), projectionColumns);
     }
 
-    final List<InputSplit> splits =
-        format.getSplits(new JobContextImpl(job.getConfiguration(), new JobID()));
-
-    List<RecordReader<Void, T>> readers = new ArrayList<>(splits.size());
-    for (InputSplit split : splits) {
-      TaskAttemptContextImpl attempt =
-          new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
-      RecordReader reader = format.createRecordReader(split, attempt);
-      try {
-        reader.initialize(split, attempt);
-        readers.add(reader);
-      } catch (Exception e) {
-        reader.close();
-        throw e;
+    try {
+      final List<InputSplit> splits =
+          format.getSplits(new JobContextImpl(job.getConfiguration(), new JobID()));
+
+      List<RecordReader<Void, T>> readers = new ArrayList<>(splits.size());
+      for (InputSplit split : splits) {
+        TaskAttemptContextImpl attempt =
+            new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
+        RecordReader reader = format.createRecordReader(split, attempt);
+        try {
+          reader.initialize(split, attempt);
+          readers.add(reader);
+        } catch (Exception e) {
+          reader.close();
+          throw e;
+        }
       }
+      return new CarbonReader<>(readers);
+    } catch (Exception ex) {
+      // Clear the datamap cache as it can get added in getSplits() method
+      DataMapStoreManager.getInstance()
+          .clearDataMaps(table.getAbsoluteTableIdentifier());
+      throw ex;
     }
-
-    return new CarbonReader<>(readers);
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e07b832f/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
index fb2e2bc..2bc4b1f 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
@@ -353,6 +353,67 @@ public class CarbonReaderTest extends TestCase {
   }
 
   @Test
+  public void testWriteAndReadFilesWithReaderBuildFail() throws IOException, InterruptedException {
+    String path1 = "./testWriteFiles";
+    String path2 = "./testWriteFiles2";
+    FileUtils.deleteDirectory(new File(path1));
+    FileUtils.deleteDirectory(new File(path2));
+
+    Field[] fields = new Field[] { new Field("c1", "string"),
+         new Field("c2", "int") };
+    Schema schema = new Schema(fields);
+    CarbonWriterBuilder builder = CarbonWriter.builder();
+
+    CarbonWriter carbonWriter = null;
+    try {
+      carbonWriter = builder.outputPath(path1).isTransactionalTable(false).uniqueIdentifier(12345)
+  .buildWriterForCSVInput(schema);
+    } catch (InvalidLoadOptionException e) {
+      e.printStackTrace();
+    }
+    carbonWriter.write(new String[] { "MNO", "100" });
+    carbonWriter.close();
+
+    Field[] fields1 = new Field[] { new Field("p1", "string"),
+         new Field("p2", "int") };
+    Schema schema1 = new Schema(fields1);
+    CarbonWriterBuilder builder1 = CarbonWriter.builder();
+
+    CarbonWriter carbonWriter1 = null;
+    try {
+      carbonWriter1 = builder1.outputPath(path2).isTransactionalTable(false).uniqueIdentifier(12345)
+   .buildWriterForCSVInput(schema1);
+    } catch (InvalidLoadOptionException e) {
+      e.printStackTrace();
+    }
+    carbonWriter1.write(new String[] { "PQR", "200" });
+    carbonWriter1.close();
+
+    try {
+       CarbonReader reader =
+       CarbonReader.builder(path1, "_temp").
+       projection(new String[] { "c1", "c3" })
+       .isTransactionalTable(false).build();
+    } catch (Exception e){
+       System.out.println("Success");
+    }
+    CarbonReader reader1 =
+         CarbonReader.builder(path2, "_temp1")
+     .projection(new String[] { "p1", "p2" })
+     .isTransactionalTable(false).build();
+
+    while (reader1.hasNext()) {
+       Object[] row1 = (Object[]) reader1.readNextRow();
+       System.out.println(row1[0]);
+       System.out.println(row1[1]);
+    }
+    reader1.close();
+
+    FileUtils.deleteDirectory(new File(path1));
+    FileUtils.deleteDirectory(new File(path2));
+  }
+
+  @Test
   public void testReadColumnTwice() throws IOException, InterruptedException {
     String path = "./testWriteFiles";
     FileUtils.deleteDirectory(new File(path));