You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2019/01/30 10:39:09 UTC

[carbondata] 05/27: [CARBONDATA-3246]Fix sdk reader issue if batch size is given as zero and vectorRead False.

This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/carbondata.git

commit 9235c7678853fa1c9c037131dbfa89a67babbc65
Author: shardul-cr7 <sh...@gmail.com>
AuthorDate: Wed Jan 23 11:31:04 2019 +0530

    [CARBONDATA-3246]Fix sdk reader issue if batch size is given as zero and vectorRead False.
    
    Problem: SDK reader is failing if vectorRead is false and detail query batch size is given as 0.
    Compiler is giving stack overflow error after getting stuck in ChunkRowIterator.hasnext recurssion.
    
    Solution: Since 0 is wrong batch size, we should take DETAIL_QUERY_BATCH_SIZE_DEFAULT as the batch size
    
    This closes #3097
---
 .../core/constants/CarbonCommonConstants.java      | 10 ++++++
 .../AbstractDetailQueryResultIterator.java         |  3 --
 .../carbondata/core/util/CarbonProperties.java     | 39 ++++++++++++++++++++-
 docs/configuration-parameters.md                   |  2 +-
 .../carbondata/sdk/file/CarbonReaderTest.java      | 23 +++++++++++++
 .../sdk/file/ConcurrentSdkReaderTest.java          | 40 ----------------------
 6 files changed, 72 insertions(+), 45 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index ccc8b99..b7d9761 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1228,6 +1228,16 @@ public final class CarbonCommonConstants {
   public static final int DETAIL_QUERY_BATCH_SIZE_DEFAULT = 100;
 
   /**
+   * Maximum batch size of carbon.detail.batch.size property
+   */
+  public static final int DETAIL_QUERY_BATCH_SIZE_MAX = 1000;
+
+  /**
+   * Minimum batch size of carbon.detail.batch.size property
+   */
+  public static final int DETAIL_QUERY_BATCH_SIZE_MIN = 100;
+
+  /**
    * max driver lru cache size upto which lru cache will be loaded in memory
    */
   @CarbonProperty
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
index 30f5183..9282d44 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
@@ -94,9 +94,6 @@ public abstract class AbstractDetailQueryResultIterator<E> extends CarbonIterato
     if (null != batchSizeString) {
       try {
         batchSize = Integer.parseInt(batchSizeString);
-        if (0 == batchSize) {
-          batchSize = CarbonCommonConstants.DETAIL_QUERY_BATCH_SIZE_DEFAULT;
-        }
       } catch (NumberFormatException ne) {
         LOGGER.error("Invalid inmemory records size. Using default value");
         batchSize = CarbonCommonConstants.DETAIL_QUERY_BATCH_SIZE_DEFAULT;
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index f9131f5..49388b7 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -56,6 +56,10 @@ import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_
 import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_MERGE_FILES;
 import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT;
 import static org.apache.carbondata.core.constants.CarbonCommonConstants.CSV_READ_BUFFER_SIZE;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.DETAIL_QUERY_BATCH_SIZE;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.DETAIL_QUERY_BATCH_SIZE_DEFAULT;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.DETAIL_QUERY_BATCH_SIZE_MAX;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.DETAIL_QUERY_BATCH_SIZE_MIN;
 import static org.apache.carbondata.core.constants.CarbonCommonConstants.ENABLE_AUTO_HANDOFF;
 import static org.apache.carbondata.core.constants.CarbonCommonConstants.ENABLE_OFFHEAP_SORT;
 import static org.apache.carbondata.core.constants.CarbonCommonConstants.ENABLE_UNSAFE_SORT;
@@ -192,6 +196,9 @@ public final class CarbonProperties {
       case CARBON_MINMAX_ALLOWED_BYTE_COUNT:
         validateStringCharacterLimit();
         break;
+      case DETAIL_QUERY_BATCH_SIZE:
+        validateDetailQueryBatchSize();
+        break;
       // TODO : Validation for carbon.lock.type should be handled for addProperty flow
       default:
         // none
@@ -256,6 +263,7 @@ public final class CarbonProperties {
     validateEnableQueryStatistics();
     validateSortMemorySpillPercentage();
     validateStringCharacterLimit();
+    validateDetailQueryBatchSize();
   }
 
   /**
@@ -1547,5 +1555,34 @@ public final class CarbonProperties {
     }
   }
 
-
+  /**
+   * This method validates the DETAIL_QUERY_BATCH_SIZE. If some invalid input is set, we use the
+   * default value for this property
+   */
+  private void validateDetailQueryBatchSize() {
+    String batchSizeString =
+        carbonProperties.getProperty(DETAIL_QUERY_BATCH_SIZE);
+    if (batchSizeString == null) {
+      carbonProperties.setProperty(DETAIL_QUERY_BATCH_SIZE,
+          Integer.toString(DETAIL_QUERY_BATCH_SIZE_DEFAULT));
+      LOGGER.info(
+          "Using default value for carbon.detail.batch.size " + DETAIL_QUERY_BATCH_SIZE_DEFAULT);
+    } else {
+      int batchSize;
+      try {
+        batchSize = Integer.parseInt(batchSizeString);
+        if (batchSize < DETAIL_QUERY_BATCH_SIZE_MIN || batchSize > DETAIL_QUERY_BATCH_SIZE_MAX) {
+          LOGGER.info("Invalid carbon.detail.batch.size.Using default value "
+              + DETAIL_QUERY_BATCH_SIZE_DEFAULT);
+          carbonProperties.setProperty(DETAIL_QUERY_BATCH_SIZE,
+              Integer.toString(DETAIL_QUERY_BATCH_SIZE_DEFAULT));
+        }
+      } catch (NumberFormatException ne) {
+        LOGGER.info("Invalid carbon.detail.batch.size.Using default value "
+            + DETAIL_QUERY_BATCH_SIZE_DEFAULT);
+        carbonProperties.setProperty(DETAIL_QUERY_BATCH_SIZE,
+            Integer.toString(DETAIL_QUERY_BATCH_SIZE_DEFAULT));
+      }
+    }
+  }
 }
diff --git a/docs/configuration-parameters.md b/docs/configuration-parameters.md
index c7d8152..d28ad61 100644
--- a/docs/configuration-parameters.md
+++ b/docs/configuration-parameters.md
@@ -129,7 +129,7 @@ This section provides the details of all the configurations required for the Car
 | carbon.search.master.port | 10020 | Port on which the search master listens for incoming query requests |
 | carbon.search.worker.port | 10021 | Port on which search master communicates with the workers. |
 | carbon.search.worker.workload.limit | 10 * *carbon.search.scan.thread* | Maximum number of active requests that can be sent to a worker. Beyond which the request needs to be rescheduled for later time or to a different worker. |
-| carbon.detail.batch.size | 100 | The buffer size to store records, returned from the block scan. In limit scenario this parameter is very important. For example your query limit is 1000. But if we set this value to 3000 that means we get 3000 records from scan but spark will only take 1000 rows. So the 2000 remaining are useless. In one Finance test case after we set it to 100, in the limit 1000 scenario the performance increase about 2 times in comparison to if we set this value to 12000. |
+| carbon.detail.batch.size | 100 | The buffer size to store records, returned from the block scan. In limit scenario this parameter is very important. For example your query limit is 1000. But if we set this value to 3000 that means we get 3000 records from scan but spark will only take 1000 rows. So the 2000 remaining are useless. In one Finance test case after we set it to 100, in the limit 1000 scenario the performance increase about 2 times in comparison to if we set this value to 12 [...]
 | carbon.enable.vector.reader | true | Spark added vector processing to optimize cpu cache miss and there by increase the query performance. This configuration enables to fetch data as columnar batch of size 4*1024 rows instead of fetching data row by row and provide it to spark so that there is improvement in  select queries performance. |
 | carbon.task.distribution | block | CarbonData has its own scheduling algorithm to suggest to Spark on how many tasks needs to be launched and how much work each task need to do in a Spark cluster for any query on CarbonData. Each of these task distribution suggestions has its own advantages and disadvantages. Based on the customer use case, appropriate task distribution can be configured.**block**: Setting this value will launch one task per block. This setting is suggested in case of  [...]
 | carbon.custom.block.distribution | false | CarbonData has its own scheduling algorithm to suggest to Spark on how many tasks needs to be launched and how much work each task need to do in a Spark cluster for any query on CarbonData. When this configuration is true, CarbonData would distribute the available blocks to be scanned among the available number of cores. For Example:If there are 10 blocks to be scanned and only 3 tasks can be run(only 3 executor cores available in the cluster) [...]
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
index acd9e5a..28944da 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
@@ -104,6 +104,29 @@ public class CarbonReaderTest extends TestCase {
     FileUtils.deleteDirectory(new File(path));
   }
 
+  @Test public void testReadWithZeroBatchSize() throws IOException, InterruptedException {
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+    DataMapStoreManager.getInstance().clearDataMaps(AbsoluteTableIdentifier.from(path));
+    Field[] fields = new Field[2];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+
+    TestUtil.writeFilesAndVerify(10, new Schema(fields), path);
+    CarbonReader reader;
+    reader = CarbonReader.builder(path).withRowRecordReader().withBatch(0).build();
+
+    int i = 0;
+    while (reader.hasNext()) {
+      Object[] row = (Object[]) reader.readNextRow();
+      Assert.assertEquals(("robot" + (i % 10)), row[0]);
+      Assert.assertEquals(i, row[1]);
+      i++;
+    }
+    Assert.assertEquals(i, 10);
+    FileUtils.deleteDirectory(new File(path));
+  }
+
   @Test
   public void testReadWithFilterOfNonTransactionalSimple() throws IOException, InterruptedException {
     String path = "./testWriteFiles";
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentSdkReaderTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentSdkReaderTest.java
index 31342b9..c75b70f 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentSdkReaderTest.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentSdkReaderTest.java
@@ -131,46 +131,6 @@ public class ConcurrentSdkReaderTest {
       executorService.awaitTermination(10, TimeUnit.MINUTES);
     }
   }
-
-  @Test public void testReadWithZeroBatchSize() throws InterruptedException {
-    int numFiles = 5;
-    int numRowsPerFile = 5;
-    short numThreads = 4;
-    writeDataMultipleFiles(numFiles, numRowsPerFile);
-
-    // Concurrent Reading
-    ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
-    try {
-      long count;
-      CarbonReader reader =
-          CarbonReader.builder(dataDir).withRowRecordReader().withBatch(0).build();
-      List<CarbonReader> multipleReaders = reader.split(numThreads);
-      try {
-        List<ReadLogic> tasks = new ArrayList<>();
-        List<Future<Long>> results;
-        count = 0;
-
-        for (CarbonReader reader_i : multipleReaders) {
-          tasks.add(new ReadLogic(reader_i));
-        }
-        results = executorService.invokeAll(tasks);
-        for (Future result_i : results) {
-          count += (long) result_i.get();
-        }
-        Assert.assertEquals(numFiles * numRowsPerFile, count);
-      } catch (Exception e) {
-        e.printStackTrace();
-        Assert.fail(e.getMessage());
-      }
-    } catch (Exception e) {
-      e.printStackTrace();
-      Assert.fail(e.getMessage());
-    } finally {
-      executorService.shutdown();
-      executorService.awaitTermination(10, TimeUnit.MINUTES);
-    }
-  }
-
   class ReadLogic implements Callable<Long> {
     CarbonReader reader;