You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2018/06/08 11:40:48 UTC

[23/50] [abbrv] carbondata git commit: [CARBONDATA-2491] Fix the error when reader read twice with SDK carbonReader

[CARBONDATA-2491] Fix the error when reader read twice with SDK carbonReader

This PR includes:
1. Fix the error out of bound when reader read twice with SDK carbonReader
2. Fix the java.lang.NegativeArraySizeException
3. Add timestamp and bad record test case
4. support parallel read of two readers

This closes #2318


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/a7ac6564
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/a7ac6564
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/a7ac6564

Branch: refs/heads/spark-2.3
Commit: a7ac65648c827be74b77bd6ef1f715508ba53a2c
Parents: 6cc86db
Author: xubo245 <xu...@huawei.com>
Authored: Fri May 18 15:40:16 2018 +0800
Committer: kunal642 <ku...@gmail.com>
Committed: Thu May 24 19:23:26 2018 +0530

----------------------------------------------------------------------
 .../core/datamap/DataMapStoreManager.java       |   2 +-
 .../scan/result/iterator/ChunkRowIterator.java  |  18 +-
 .../carbondata/hadoop/CarbonRecordReader.java   |   3 +
 .../carbondata/sdk/file/CarbonReader.java       |  17 ++
 .../carbondata/sdk/file/CarbonReaderTest.java   | 234 ++++++++++++++++++-
 5 files changed, 263 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/a7ac6564/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
index 1359e85..0fcf4cd 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
@@ -411,7 +411,7 @@ public final class DataMapStoreManager {
   }
 
   /**
-   * this methos clears the datamap of table from memory
+   * this methods clears the datamap of table from memory
    */
   public void clearDataMaps(String tableUniqName) {
     List<TableDataMap> tableIndices = allDataMaps.get(tableUniqName);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a7ac6564/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/ChunkRowIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/ChunkRowIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/ChunkRowIterator.java
index 1235789..0866395 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/ChunkRowIterator.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/ChunkRowIterator.java
@@ -31,14 +31,14 @@ public class ChunkRowIterator extends CarbonIterator<Object[]> {
   private CarbonIterator<RowBatch> iterator;
 
   /**
-   * currect chunk
+   * current chunk
    */
-  private RowBatch currentchunk;
+  private RowBatch currentChunk;
 
   public ChunkRowIterator(CarbonIterator<RowBatch> iterator) {
     this.iterator = iterator;
     if (iterator.hasNext()) {
-      currentchunk = iterator.next();
+      currentChunk = iterator.next();
     }
   }
 
@@ -50,13 +50,13 @@ public class ChunkRowIterator extends CarbonIterator<Object[]> {
    * @return {@code true} if the iteration has more elements
    */
   @Override public boolean hasNext() {
-    if (null != currentchunk) {
-      if ((currentchunk.hasNext())) {
+    if (null != currentChunk) {
+      if ((currentChunk.hasNext())) {
         return true;
-      } else if (!currentchunk.hasNext()) {
+      } else if (!currentChunk.hasNext()) {
         while (iterator.hasNext()) {
-          currentchunk = iterator.next();
-          if (currentchunk != null && currentchunk.hasNext()) {
+          currentChunk = iterator.next();
+          if (currentChunk != null && currentChunk.hasNext()) {
             return true;
           }
         }
@@ -71,7 +71,7 @@ public class ChunkRowIterator extends CarbonIterator<Object[]> {
    * @return the next element in the iteration
    */
   @Override public Object[] next() {
-    return currentchunk.next();
+    return currentChunk.next();
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a7ac6564/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
index 1191a38..d4b091c 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
@@ -23,6 +23,7 @@ import java.util.Map;
 
 import org.apache.carbondata.common.CarbonIterator;
 import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.datamap.DataMapStoreManager;
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.scan.executor.QueryExecutor;
 import org.apache.carbondata.core.scan.executor.QueryExecutorFactory;
@@ -118,6 +119,8 @@ public class CarbonRecordReader<T> extends AbstractRecordReader<T> {
         CarbonUtil.clearDictionaryCache(entry.getValue());
       }
     }
+    // Clear the datamap cache
+    DataMapStoreManager.getInstance().getDefaultDataMap(queryModel.getTable()).clear();
     // close read support
     readSupport.close();
     try {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a7ac6564/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java
index c9cd8f5..6517e89 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java
@@ -44,6 +44,8 @@ public class CarbonReader<T> {
 
   private int index;
 
+  private boolean initialise;
+
   /**
    * Call {@link #builder(String)} to construct an instance
    */
@@ -51,6 +53,7 @@ public class CarbonReader<T> {
     if (readers.size() == 0) {
       throw new IllegalArgumentException("no reader");
     }
+    this.initialise = true;
     this.readers = readers;
     this.index = 0;
     this.currentReader = readers.get(0);
@@ -60,6 +63,7 @@ public class CarbonReader<T> {
    * Return true if has next row
    */
   public boolean hasNext() throws IOException, InterruptedException {
+    validateReader();
     if (currentReader.nextKeyValue()) {
       return true;
     } else {
@@ -78,6 +82,7 @@ public class CarbonReader<T> {
    * Read and return next row object
    */
   public T readNextRow() throws IOException, InterruptedException {
+    validateReader();
     return currentReader.getCurrentValue();
   }
 
@@ -111,6 +116,18 @@ public class CarbonReader<T> {
    * @throws IOException
    */
   public void close() throws IOException {
+    validateReader();
     this.currentReader.close();
+    this.initialise = false;
+  }
+
+  /**
+   * Validate the reader
+   */
+  private void validateReader() {
+    if (!this.initialise) {
+      throw new RuntimeException(this.getClass().getSimpleName() +
+          " not initialise, please create it first.");
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a7ac6564/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
index 937dde8..0d2c84e 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
@@ -18,21 +18,30 @@
 package org.apache.carbondata.sdk.file;
 
 import java.io.File;
+import java.io.FileFilter;
 import java.io.FilenameFilter;
 import java.io.IOException;
+import java.sql.Date;
+import java.sql.Timestamp;
 import java.util.List;
 
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.schema.table.TableInfo;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
 
+import junit.framework.TestCase;
 import org.apache.commons.io.FileUtils;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-public class CarbonReaderTest {
+public class CarbonReaderTest extends TestCase {
 
   @Before
   public void cleanFile() {
@@ -77,6 +86,99 @@ public class CarbonReaderTest {
     Assert.assertEquals(i, 100);
 
     reader.close();
+
+    // Read again
+    CarbonReader reader2 = CarbonReader
+        .builder(path, "_temp")
+        .projection(new String[]{"name", "age"})
+        .build();
+
+    i = 0;
+    while (reader2.hasNext()) {
+      Object[] row = (Object[]) reader2.readNextRow();
+      // Default sort column is applied for dimensions. So, need  to validate accordingly
+      Assert.assertEquals(name[i], row[0]);
+      Assert.assertEquals(age[i], row[1]);
+      i++;
+    }
+    Assert.assertEquals(i, 100);
+    reader2.close();
+
+    FileUtils.deleteDirectory(new File(path));
+  }
+
+  @Test
+  public void testReadFilesParallel() throws IOException, InterruptedException {
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+
+    Field[] fields = new Field[2];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+
+    TestUtil.writeFilesAndVerify(new Schema(fields), path, true);
+
+    CarbonReader reader = CarbonReader
+        .builder(path, "_temp")
+        .projection(new String[]{"name", "age"})
+        .build();
+    // Reader 2
+    CarbonReader reader2 = CarbonReader
+        .builder(path, "_temp")
+        .projection(new String[]{"name", "age"})
+        .build();
+
+    while (reader.hasNext()) {
+      Object[] row = (Object[]) reader.readNextRow();
+      Object[] row2 = (Object[]) reader2.readNextRow();
+      // parallel compare
+      Assert.assertEquals(row[0], row2[0]);
+      Assert.assertEquals(row[1], row2[1]);
+    }
+
+    reader.close();
+    reader2.close();
+
+    FileUtils.deleteDirectory(new File(path));
+  }
+
+  @Test
+  public void testReadAfterClose() throws IOException, InterruptedException {
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+
+    Field[] fields = new Field[2];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+
+    TestUtil.writeFilesAndVerify(new Schema(fields), path, true);
+
+    CarbonReader reader = CarbonReader.builder(path, "_temp")
+        .projection(new String[]{"name", "age"}).build();
+
+    reader.close();
+    String msg = "CarbonReader not initialise, please create it first.";
+    try {
+      reader.hasNext();
+      assert (false);
+    } catch (RuntimeException e) {
+      assert (e.getMessage().equals(msg));
+    }
+
+    try {
+      reader.readNextRow();
+      assert (false);
+    } catch (RuntimeException e) {
+      assert (e.getMessage().equals(msg));
+    }
+
+    try {
+      reader.close();
+      assert (false);
+    } catch (RuntimeException e) {
+      assert (e.getMessage().equals(msg));
+    }
+
     FileUtils.deleteDirectory(new File(path));
   }
 
@@ -177,4 +279,134 @@ public class CarbonReaderTest {
     reader.close();
     FileUtils.deleteDirectory(new File(path));
   }
+
+  CarbonProperties carbonProperties;
+
+  @Override
+  public void setUp() {
+    carbonProperties = CarbonProperties.getInstance();
+  }
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(CarbonReaderTest.class.getName());
+
+  @Test
+  public void testTimeStampAndBadRecord() throws IOException, InterruptedException {
+    String timestampFormat = carbonProperties.getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+        CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT);
+    String badRecordAction = carbonProperties.getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
+        CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT);
+    String badRecordLoc = carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
+        CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL);
+    String rootPath = new File(this.getClass().getResource("/").getPath()
+        + "../../").getCanonicalPath();
+    String storeLocation = rootPath + "/target/";
+    carbonProperties
+        .addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, storeLocation)
+        .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy-MM-dd hh:mm:ss")
+        .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "REDIRECT");
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+
+    Field[] fields = new Field[9];
+    fields[0] = new Field("stringField", DataTypes.STRING);
+    fields[1] = new Field("intField", DataTypes.INT);
+    fields[2] = new Field("shortField", DataTypes.SHORT);
+    fields[3] = new Field("longField", DataTypes.LONG);
+    fields[4] = new Field("doubleField", DataTypes.DOUBLE);
+    fields[5] = new Field("boolField", DataTypes.BOOLEAN);
+    fields[6] = new Field("dateField", DataTypes.DATE);
+    fields[7] = new Field("timeField", DataTypes.TIMESTAMP);
+    fields[8] = new Field("decimalField", DataTypes.createDecimalType(8, 2));
+
+    try {
+      CarbonWriterBuilder builder = CarbonWriter.builder()
+          .isTransactionalTable(true)
+          .persistSchemaFile(true)
+          .outputPath(path);
+
+      CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields));
+
+      for (int i = 0; i < 100; i++) {
+        String[] row = new String[]{
+            "robot" + (i % 10),
+            String.valueOf(i),
+            String.valueOf(i),
+            String.valueOf(Long.MAX_VALUE - i),
+            String.valueOf((double) i / 2),
+            String.valueOf(true),
+            "2018-05-12",
+            "2018-05-12",
+            "12.345"
+        };
+        writer.write(row);
+        String[] row2 = new String[]{
+            "robot" + (i % 10),
+            String.valueOf(i),
+            String.valueOf(i),
+            String.valueOf(Long.MAX_VALUE - i),
+            String.valueOf((double) i / 2),
+            String.valueOf(true),
+            "2019-03-02",
+            "2019-02-12 03:03:34",
+            "12.345"
+        };
+        writer.write(row2);
+      }
+      writer.close();
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.fail(e.getMessage());
+    }
+    LOGGER.audit("Bad record location:" + storeLocation);
+    File segmentFolder = new File(CarbonTablePath.getSegmentPath(path, "null"));
+    Assert.assertTrue(segmentFolder.exists());
+
+    File[] dataFiles = segmentFolder.listFiles(new FileFilter() {
+      @Override
+      public boolean accept(File pathname) {
+        return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT);
+      }
+    });
+    Assert.assertNotNull(dataFiles);
+    Assert.assertTrue(dataFiles.length > 0);
+
+    CarbonReader reader = CarbonReader.builder(path, "_temp")
+        .projection(new String[]{
+            "stringField"
+            , "shortField"
+            , "intField"
+            , "longField"
+            , "doubleField"
+            , "boolField"
+            , "dateField"
+            , "timeField"
+            , "decimalField"}).build();
+
+    int i = 0;
+    while (reader.hasNext()) {
+      Object[] row = (Object[]) reader.readNextRow();
+      int id = (int) row[2];
+      Assert.assertEquals("robot" + (id % 10), row[0]);
+      Assert.assertEquals(Short.parseShort(String.valueOf(id)), row[1]);
+      Assert.assertEquals(Long.MAX_VALUE - id, row[3]);
+      Assert.assertEquals((double) id / 2, row[4]);
+      Assert.assertEquals(true, (boolean) row[5]);
+      long day = 24L * 3600 * 1000;
+      Assert.assertEquals("2019-03-02", new Date((day * ((int) row[6]))).toString());
+      Assert.assertEquals("2019-02-12 03:03:34.0", new Timestamp((long) row[7] / 1000).toString());
+      i++;
+    }
+    Assert.assertEquals(i, 100);
+
+    reader.close();
+    FileUtils.deleteDirectory(new File(path));
+    carbonProperties.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+        timestampFormat);
+    carbonProperties.addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
+        badRecordAction);
+    carbonProperties.addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
+        badRecordLoc);
+  }
+
 }