You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2018/06/08 11:41:10 UTC

[45/50] [abbrv] carbondata git commit: [CARONDATA-2559]task id set for each carbonReader in threadlocal

[CARONDATA-2559]task id set for each carbonReader in threadlocal

1. Task Id set for CarbonReader because for each CarbonReader object it should be separate Thread Local variable .
2. If sort-Column is not given to CarbonWriter Describe formatted showing default sort_cols is fixed
3. Issue : CarbonReader was being closed after one iteration. So when reader iterates over the next batch it gives NullPointerException because it is already closed.
Solution : reader is closed if any exception encountered. Else It will be closed explicitly by user.
4. CarbonProperties API for SDK moved to common API List because Property setting is common for both(carbonReader and CarbonWriter) .


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/92d9b925
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/92d9b925
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/92d9b925

Branch: refs/heads/spark-2.3
Commit: 92d9b9256373763f05736e29d93b7e835e0da3dd
Parents: 4bb7e27
Author: rahulforallp <ra...@knoldus.in>
Authored: Tue May 29 10:23:46 2018 +0530
Committer: kumarvishal09 <ku...@gmail.com>
Committed: Mon Jun 4 17:49:05 2018 +0530

----------------------------------------------------------------------
 docs/sdk-guide.md                               | 95 ++++++++++----------
 .../TestNonTransactionalCarbonTable.scala       | 13 +--
 .../carbondata/sdk/file/CarbonReader.java       |  5 ++
 .../sdk/file/CarbonReaderBuilder.java           | 10 ++-
 .../sdk/file/CarbonWriterBuilder.java           |  4 +-
 .../sdk/file/CSVCarbonWriterTest.java           |  2 +-
 .../carbondata/sdk/file/CarbonReaderTest.java   | 41 ++++-----
 .../apache/carbondata/sdk/file/TestUtil.java    |  4 +-
 .../carbondata/store/LocalCarbonStoreTest.java  |  2 +-
 9 files changed, 96 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/92d9b925/docs/sdk-guide.md
----------------------------------------------------------------------
diff --git a/docs/sdk-guide.md b/docs/sdk-guide.md
index 2371b33..5dbb5ac 100644
--- a/docs/sdk-guide.md
+++ b/docs/sdk-guide.md
@@ -350,52 +350,6 @@ public Schema(Field[] fields);
 public static Schema parseJson(String json);
 ```
 
-### Class org.apache.carbondata.core.util.CarbonProperties
-
-```
-/**
-* This method will be responsible to get the instance of CarbonProperties class
-*
-* @return carbon properties instance
-*/
-public static CarbonProperties getInstance();
-```
-
-```
-/**
-* This method will be used to add a new property
-*
-* @param key is a property name to set for carbon.
-* @param value is valid parameter corresponding to property.
-* @return CarbonProperties object
-*/
-public CarbonProperties addProperty(String key, String value);
-```
-
-```
-/**
-* This method will be used to get the property value. If property is not
-* present, then it will return the default value.
-*
-* @param key is a property name to get user specified value.
-* @return properties value for corresponding key. If not set, then returns null.
-*/
-public String getProperty(String key);
-```
-
-```
-/**
-* This method will be used to get the property value. If property is not
-* present, then it will return the default value.
-*
-* @param key is a property name to get user specified value..
-* @param defaultValue used to be returned by function if corrosponding key not set.
-* @return properties value for corresponding key. If not set, then returns specified defaultValue.
-*/
-public String getProperty(String key, String defaultValue);
-```
-Reference : [list of carbon properties](http://carbondata.apache.org/configuration-parameters.html)
-
 ### Class org.apache.carbondata.sdk.file.AvroCarbonWriter
 ```
 /**
@@ -705,3 +659,52 @@ Find example code at [CarbonReaderExample](https://github.com/apache/carbondata/
 ```
 
 Find S3 example code at [SDKS3Example](https://github.com/apache/carbondata/blob/master/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java) in the CarbonData repo.
+
+
+# Common API List for CarbonReader and CarbonWriter
+
+### Class org.apache.carbondata.core.util.CarbonProperties
+
+```
+/**
+* This method will be responsible to get the instance of CarbonProperties class
+*
+* @return carbon properties instance
+*/
+public static CarbonProperties getInstance();
+```
+
+```
+/**
+* This method will be used to add a new property
+*
+* @param key is a property name to set for carbon.
+* @param value is valid parameter corresponding to property.
+* @return CarbonProperties object
+*/
+public CarbonProperties addProperty(String key, String value);
+```
+
+```
+/**
+* This method will be used to get the property value. If property is not
+* present, then it will return the default value.
+*
+* @param key is a property name to get user specified value.
+* @return properties value for corresponding key. If not set, then returns null.
+*/
+public String getProperty(String key);
+```
+
+```
+/**
+* This method will be used to get the property value. If property is not
+* present, then it will return the default value.
+*
+* @param key is a property name to get user specified value..
+* @param defaultValue used to be returned by function if corrosponding key not set.
+* @return properties value for corresponding key. If not set, then returns specified defaultValue.
+*/
+public String getProperty(String key, String defaultValue);
+```
+Reference : [list of carbon properties](http://carbondata.apache.org/configuration-parameters.html)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/92d9b925/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
index 0083733..5beb9c4 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
@@ -378,7 +378,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
          |'carbondata' LOCATION
          |'$writerPath' """.stripMargin)
 
-    checkExistence(sql("describe formatted sdkOutputTable"), true, "name")
+    checkExistence(sql("describe formatted sdkOutputTable"), true, "SORT_COLUMNS                        name")
 
     buildTestDataWithSortColumns(List())
     assert(new File(writerPath).exists())
@@ -390,15 +390,18 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
          |'carbondata' LOCATION
          |'$writerPath' """.stripMargin)
 
-    sql("describe formatted sdkOutputTable").show(false)
+    checkExistence(sql("describe formatted sdkOutputTable"),false,"SORT_COLUMNS                        name")
     sql("select * from sdkOutputTable").show()
 
+    sql("DROP TABLE sdkOutputTable")
+    // drop table should not delete the files
+    assert(new File(writerPath).exists())
+    cleanTestData()
+
     intercept[RuntimeException] {
       buildTestDataWithSortColumns(List(""))
     }
-
-    sql("DROP TABLE sdkOutputTable")
-    // drop table should not delete the files
+    
     assert(!(new File(writerPath).exists()))
     cleanTestData()
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/92d9b925/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java
index 81db7b2..9af710f 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java
@@ -24,6 +24,8 @@ import java.util.List;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.core.util.CarbonTaskInfo;
+import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
 
 import org.apache.hadoop.mapreduce.RecordReader;
 
@@ -54,6 +56,9 @@ public class CarbonReader<T> {
     this.readers = readers;
     this.index = 0;
     this.currentReader = readers.get(0);
+    CarbonTaskInfo carbonTaskInfo = new CarbonTaskInfo();
+    carbonTaskInfo.setTaskId(System.nanoTime());
+    ThreadLocalTaskInfo.setCarbonTaskInfo(carbonTaskInfo);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/92d9b925/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
index e99ff0d..9d7470e 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
@@ -233,9 +233,13 @@ public class CarbonReaderBuilder {
       TaskAttemptContextImpl attempt =
           new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
       RecordReader reader = format.createRecordReader(split, attempt);
-      reader.initialize(split, attempt);
-      reader.close();
-      readers.add(reader);
+      try {
+        reader.initialize(split, attempt);
+        readers.add(reader);
+      } catch (Exception e) {
+        reader.close();
+        throw e;
+      }
     }
 
     return new CarbonReader<>(readers);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/92d9b925/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
index e2dc8c2..bd64568 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
@@ -379,7 +379,7 @@ public class CarbonWriterBuilder {
     }
 
     List<String> sortColumnsList = new ArrayList<>();
-    if (sortColumns == null || sortColumns.length == 0) {
+    if (sortColumns == null) {
       // If sort columns are not specified, default set all dimensions to sort column.
       // When dimensions are default set to sort column,
       // Inverted index will be supported by default for sort columns.
@@ -484,7 +484,7 @@ public class CarbonWriterBuilder {
           if (isSortColumn > -1) {
             columnSchema.setSortColumn(true);
             sortColumnsSchemaList[isSortColumn] = columnSchema;
-          } else if (sortColumnsList.isEmpty() && columnSchema.isDimensionColumn()
+          } else if (!sortColumnsList.isEmpty() && columnSchema.isDimensionColumn()
               && columnSchema.getNumberOfChild() < 1) {
             columnSchema.setSortColumn(true);
             sortColumnsSchemaList[i] = columnSchema;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/92d9b925/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
index 1eed47b..865097b 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
@@ -205,7 +205,7 @@ public class CSVCarbonWriterTest {
     fields[0] = new Field("name", DataTypes.STRING);
     fields[1] = new Field("age", DataTypes.INT);
 
-    TestUtil.writeFilesAndVerify(new Schema(fields), path, true);
+    TestUtil.writeFilesAndVerify(100, new Schema(fields), path, true);
 
     String schemaFile = CarbonTablePath.getSchemaFilePath(path);
     Assert.assertTrue(new File(schemaFile).exists());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/92d9b925/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
index 95c25f8..db118cd 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
@@ -59,28 +59,28 @@ public class CarbonReaderTest extends TestCase {
     fields[0] = new Field("name", DataTypes.STRING);
     fields[1] = new Field("age", DataTypes.INT);
 
-    TestUtil.writeFilesAndVerify(new Schema(fields), path, true);
+    TestUtil.writeFilesAndVerify(200, new Schema(fields), path, true);
 
     CarbonReader reader = CarbonReader.builder(path, "_temp").isTransactionalTable(true)
         .projection(new String[]{"name", "age"}).build();
 
     // expected output after sorting
-    String[] name = new String[100];
-    int[] age = new int[100];
-    for (int i = 0; i < 100; i++) {
+    String[] name = new String[200];
+    Integer[] age = new Integer[200];
+    for (int i = 0; i < 200; i++) {
       name[i] = "robot" + (i / 10);
-      age[i] = (i % 10) * 10 + i / 10;
+      age[i] = i;
     }
 
     int i = 0;
     while (reader.hasNext()) {
       Object[] row = (Object[]) reader.readNextRow();
       // Default sort column is applied for dimensions. So, need  to validate accordingly
-      Assert.assertEquals(name[i], row[0]);
-      Assert.assertEquals(age[i], row[1]);
+      assert(Arrays.asList(name).contains(row[0]));
+      assert(Arrays.asList(age).contains(row[1]));
       i++;
     }
-    Assert.assertEquals(i, 100);
+    Assert.assertEquals(i, 200);
 
     reader.close();
 
@@ -95,11 +95,11 @@ public class CarbonReaderTest extends TestCase {
     while (reader2.hasNext()) {
       Object[] row = (Object[]) reader2.readNextRow();
       // Default sort column is applied for dimensions. So, need  to validate accordingly
-      Assert.assertEquals(name[i], row[0]);
-      Assert.assertEquals(age[i], row[1]);
+      assert(Arrays.asList(name).contains(row[0]));
+      assert(Arrays.asList(age).contains(row[1]));
       i++;
     }
-    Assert.assertEquals(i, 100);
+    Assert.assertEquals(i, 200);
     reader2.close();
 
     FileUtils.deleteDirectory(new File(path));
@@ -114,7 +114,7 @@ public class CarbonReaderTest extends TestCase {
     fields[0] = new Field("name", DataTypes.STRING);
     fields[1] = new Field("age", DataTypes.INT);
 
-    TestUtil.writeFilesAndVerify(new Schema(fields), path, true);
+    TestUtil.writeFilesAndVerify(100, new Schema(fields), path, true);
 
     CarbonReader reader = CarbonReader
         .builder(path, "_temp")
@@ -156,7 +156,7 @@ public class CarbonReaderTest extends TestCase {
     fields[0] = new Field("name", DataTypes.STRING);
     fields[1] = new Field("age", DataTypes.INT);
 
-    TestUtil.writeFilesAndVerify(new Schema(fields), path, true);
+    TestUtil.writeFilesAndVerify(100, new Schema(fields), path, true);
 
     CarbonReader reader = CarbonReader
         .builder(path, "_temp")
@@ -193,7 +193,7 @@ public class CarbonReaderTest extends TestCase {
     fields[0] = new Field("name", DataTypes.STRING);
     fields[1] = new Field("age", DataTypes.INT);
 
-    TestUtil.writeFilesAndVerify(new Schema(fields), path, true);
+    TestUtil.writeFilesAndVerify(100, new Schema(fields), path, true);
 
     CarbonReader reader = CarbonReader.builder(path, "_temp").isTransactionalTable(true)
         .projection(new String[]{"name", "age"}).build();
@@ -233,7 +233,7 @@ public class CarbonReaderTest extends TestCase {
     fields[0] = new Field("name", DataTypes.STRING);
     fields[1] = new Field("age", DataTypes.INT);
 
-    TestUtil.writeFilesAndVerify(new Schema(fields), path, true);
+    TestUtil.writeFilesAndVerify(100, new Schema(fields), path, true);
 
     CarbonReader reader = CarbonReader
         .builder(path)
@@ -309,7 +309,7 @@ public class CarbonReaderTest extends TestCase {
     fields[0] = new Field("name", DataTypes.STRING);
     fields[1] = new Field("age", DataTypes.INT);
 
-    TestUtil.writeFilesAndVerify(new Schema(fields), path, true);
+    TestUtil.writeFilesAndVerify(100, new Schema(fields), path, true);
 
     File[] dataFiles = new File(path + "/Fact/Part0/Segment_null/").listFiles(new FilenameFilter() {
       @Override public boolean accept(File dir, String name) {
@@ -337,7 +337,7 @@ public class CarbonReaderTest extends TestCase {
     fields[0] = new Field("name", DataTypes.STRING);
     fields[1] = new Field("age", DataTypes.INT);
 
-    TestUtil.writeFilesAndVerify(new Schema(fields), path, true);
+    TestUtil.writeFilesAndVerify(100, new Schema(fields), path, true);
 
     File[] dataFiles = new File(path + "/Metadata").listFiles(new FilenameFilter() {
       @Override public boolean accept(File dir, String name) {
@@ -887,7 +887,7 @@ public class CarbonReaderTest extends TestCase {
     fields[0] = new Field("name", DataTypes.STRING);
     fields[1] = new Field("age", DataTypes.INT);
 
-    TestUtil.writeFilesAndVerify(new Schema(fields), path, true);
+    TestUtil.writeFilesAndVerify(100, new Schema(fields), path, true);
 
     CarbonReader reader = CarbonReader
         .builder(path, "_temp")
@@ -926,7 +926,7 @@ public class CarbonReaderTest extends TestCase {
     fields[0] = new Field("name", DataTypes.STRING);
     fields[1] = new Field("age", DataTypes.INT);
 
-    TestUtil.writeFilesAndVerify(new Schema(fields), path, true);
+    TestUtil.writeFilesAndVerify(100, new Schema(fields), path, true);
 
     CarbonReader reader = CarbonReader
         .builder(path, "_temp")
@@ -948,6 +948,7 @@ public class CarbonReaderTest extends TestCase {
       Assert.assertEquals(age[i], row[1]);
       i++;
     }
+    reader.close();
     Assert.assertEquals(i, 100);
   }
 
@@ -960,7 +961,7 @@ public class CarbonReaderTest extends TestCase {
     fields[0] = new Field("name", DataTypes.STRING);
     fields[1] = new Field("age", DataTypes.INT);
 
-    TestUtil.writeFilesAndVerify(new Schema(fields), path, true);
+    TestUtil.writeFilesAndVerify(100, new Schema(fields), path, true);
 
     try {
       CarbonReader reader = CarbonReader

http://git-wip-us.apache.org/repos/asf/carbondata/blob/92d9b925/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java
index eb406e2..0f00d61 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java
@@ -39,8 +39,8 @@ public class TestUtil {
     writeFilesAndVerify(100, schema, path, sortColumns, false, -1, -1, true);
   }
 
-  public static void writeFilesAndVerify(Schema schema, String path, boolean persistSchema) {
-    writeFilesAndVerify(100, schema, path, null, persistSchema, -1, -1, true);
+  public static void writeFilesAndVerify(int rows, Schema schema, String path, boolean persistSchema) {
+    writeFilesAndVerify(rows, schema, path, null, persistSchema, -1, -1, true);
   }
 
   public static void writeFilesAndVerify(Schema schema, String path, boolean persistSchema,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/92d9b925/store/sdk/src/test/java/org/apache/carbondata/store/LocalCarbonStoreTest.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/store/LocalCarbonStoreTest.java b/store/sdk/src/test/java/org/apache/carbondata/store/LocalCarbonStoreTest.java
index 51d0b27..c885a26 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/store/LocalCarbonStoreTest.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/store/LocalCarbonStoreTest.java
@@ -56,7 +56,7 @@ public class LocalCarbonStoreTest {
     fields[0] = new Field("name", DataTypes.STRING);
     fields[1] = new Field("age", DataTypes.INT);
 
-    TestUtil.writeFilesAndVerify(new Schema(fields), path, true);
+    TestUtil.writeFilesAndVerify(100, new Schema(fields), path, true);
 
     CarbonStore store = new LocalCarbonStore();
     Iterator<CarbonRow> rows = store.scan(path, new String[]{"name, age"}, null);