You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/11/21 17:59:29 UTC

[02/50] [abbrv] carbondata git commit: [CARBONDATA-3056] Added concurrent reading through SDK

[CARBONDATA-3056] Added concurrent reading through SDK

Added an API CarbonReader.split to enable concurrent reading of carbondata files through SDK.

This closes #2850


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/b9720d36
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/b9720d36
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/b9720d36

Branch: refs/heads/branch-1.5
Commit: b9720d361560c8224bf6caaa1af6de8a45cf2baf
Parents: e98e09a
Author: Naman Rastogi <na...@gmail.com>
Authored: Thu Oct 18 18:24:23 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Wed Nov 21 22:37:26 2018 +0530

----------------------------------------------------------------------
 docs/sdk-guide.md                               |  20 +++
 .../carbondata/sdk/file/CarbonReader.java       |  52 ++++++
 .../sdk/file/ConcurrentSdkReaderTest.java       | 159 +++++++++++++++++++
 3 files changed, 231 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/b9720d36/docs/sdk-guide.md
----------------------------------------------------------------------
diff --git a/docs/sdk-guide.md b/docs/sdk-guide.md
index 0ee1524..cb34627 100644
--- a/docs/sdk-guide.md
+++ b/docs/sdk-guide.md
@@ -591,6 +591,26 @@ Find example code at [CarbonReaderExample](https://github.com/apache/carbondata/
 ```
 
 ```
+/**
+  * Breaks the list of CarbonRecordReader in CarbonReader into multiple
+  * CarbonReader objects, each iterating through some 'carbondata' files
+  * and return that list of CarbonReader objects
+  *
+  * If the no. of files is greater than maxSplits, then break the
+  * CarbonReader into maxSplits splits, with each split iterating
+  * through >= 1 file.
+  *
+  * If the no. of files is less than maxSplits, then return list of
+  * CarbonReader with size as the no. of files, with each CarbonReader
+  * iterating through exactly one file
+  *
+  * @param maxSplits: Int
+  * @return list of CarbonReader objects
+  */
+  public List<CarbonReader> split(int maxSplits);
+``
+
+```
   /**
    * Return true if has next row
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b9720d36/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java
index a381429..1a55a2e 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java
@@ -18,6 +18,7 @@
 package org.apache.carbondata.sdk.file;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 
@@ -115,6 +116,57 @@ public class CarbonReader<T> {
   }
 
   /**
+   * Breaks the list of CarbonRecordReader in CarbonReader into multiple
+   * CarbonReader objects, each iterating through some 'carbondata' files
+   * and return that list of CarbonReader objects
+   *
+   * If the no. of files is greater than maxSplits, then break the
+   * CarbonReader into maxSplits splits, with each split iterating
+   * through >= 1 file.
+   *
+   * If the no. of files is less than maxSplits, then return list of
+   * CarbonReader with size as the no. of files, with each CarbonReader
+   * iterating through exactly one file
+   *
+   * @param maxSplits: Int
+   * @return list of {@link CarbonReader} objects
+   */
+  public List<CarbonReader> split(int maxSplits) throws IOException {
+    validateReader();
+    if (maxSplits < 1) {
+      throw new RuntimeException(
+          this.getClass().getSimpleName() + ".split: maxSplits must be positive");
+    }
+
+    List<CarbonReader> carbonReaders = new ArrayList<>();
+
+    if (maxSplits < this.readers.size()) {
+      // If maxSplits is less than the no. of files
+      // Split the reader into maxSplits splits with each
+      // element containing >= 1 CarbonRecordReader objects
+      float filesPerSplit = (float) this.readers.size() / maxSplits;
+      for (int i = 0; i < maxSplits; ++i) {
+        carbonReaders.add(new CarbonReader<>(this.readers.subList(
+            (int) Math.ceil(i * filesPerSplit),
+            (int) Math.ceil(((i + 1) * filesPerSplit)))));
+      }
+    } else {
+      // If maxSplits is greater than the no. of files
+      // Split the reader into <num_files> splits with each
+      // element contains exactly 1 CarbonRecordReader object
+      for (int i = 0; i < this.readers.size(); ++i) {
+        carbonReaders.add(new CarbonReader<>(this.readers.subList(i, i + 1)));
+      }
+    }
+
+    // This is to disable the use of this CarbonReader object to iterate
+    // over the files and forces user to only use the returned splits
+    this.initialise = false;
+
+    return carbonReaders;
+  }
+
+  /**
    * Close reader
    *
    * @throws IOException

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b9720d36/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentSdkReaderTest.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentSdkReaderTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentSdkReaderTest.java
new file mode 100644
index 0000000..fef3319
--- /dev/null
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentSdkReaderTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.sdk.file;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.*;
+
+/**
+ * multi-thread Test suite for {@link CarbonReader}
+ */
+public class ConcurrentSdkReaderTest {
+
+  private static final String dataDir = "./testReadFiles";
+
+  @Before @After public void cleanTestData() {
+    try {
+      FileUtils.deleteDirectory(new File(dataDir));
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.fail(e.getMessage());
+    }
+  }
+
+  private void writeDataMultipleFiles(int numFiles, long numRowsPerFile) {
+    Field[] fields = new Field[2];
+    fields[0] = new Field("stringField", DataTypes.STRING);
+    fields[1] = new Field("intField", DataTypes.INT);
+
+    for (int numFile = 0; numFile < numFiles; ++numFile) {
+      CarbonWriterBuilder builder =
+          CarbonWriter.builder().outputPath(dataDir).withCsvInput(new Schema(fields))
+              .writtenBy("ConcurrentSdkReaderTest");
+
+      try {
+        CarbonWriter writer = builder.build();
+
+        for (long i = 0; i < numRowsPerFile; ++i) {
+          writer.write(new String[] { "robot_" + i, String.valueOf(i) });
+        }
+        writer.close();
+      } catch (Exception e) {
+        e.printStackTrace();
+        Assert.fail(e.getMessage());
+      }
+    }
+  }
+
+  @Test public void testReadParallely() throws IOException, InterruptedException {
+    int numFiles = 10;
+    int numRowsPerFile = 10;
+    short numThreads = 4;
+    writeDataMultipleFiles(numFiles, numRowsPerFile);
+    long count;
+
+    // Sequential Reading
+    CarbonReader reader = CarbonReader.builder(dataDir).build();
+    try {
+      count = 0;
+      long start = System.currentTimeMillis();
+      while (reader.hasNext()) {
+        reader.readNextRow();
+        count += 1;
+      }
+      long end = System.currentTimeMillis();
+      System.out.println("[Sequential read] Time: " + (end - start) + " ms");
+      Assert.assertEquals(numFiles * numRowsPerFile, count);
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.fail(e.getMessage());
+    } finally {
+      reader.close();
+    }
+
+    // Concurrent Reading
+    ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
+    try {
+      CarbonReader reader2 = CarbonReader.builder(dataDir).build();
+      List<CarbonReader> multipleReaders = reader2.split(numThreads);
+      try {
+        List<ReadLogic> tasks = new ArrayList<>();
+        List<Future<Long>> results;
+        count = 0;
+
+        for (CarbonReader reader_i : multipleReaders) {
+          tasks.add(new ReadLogic(reader_i));
+        }
+        long start = System.currentTimeMillis();
+        results = executorService.invokeAll(tasks);
+        for (Future result_i : results) {
+          count += (long) result_i.get();
+        }
+        long end = System.currentTimeMillis();
+        System.out.println("[Parallel read] Time: " + (end - start) + " ms");
+        Assert.assertEquals(numFiles * numRowsPerFile, count);
+      } catch (Exception e) {
+        e.printStackTrace();
+        Assert.fail(e.getMessage());
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.fail(e.getMessage());
+    } finally {
+      executorService.shutdown();
+      executorService.awaitTermination(10, TimeUnit.MINUTES);
+    }
+  }
+
+  class ReadLogic implements Callable<Long> {
+    CarbonReader reader;
+
+    ReadLogic(CarbonReader reader) {
+      this.reader = reader;
+    }
+
+    @Override public Long call() throws IOException {
+      long count = 0;
+      try {
+        while (reader.hasNext()) {
+          reader.readNextRow();
+          count += 1;
+        }
+      } catch (Exception e) {
+        e.printStackTrace();
+        Assert.fail(e.getMessage());
+      } finally {
+        reader.close();
+      }
+      return count;
+    }
+  }
+
+}
\ No newline at end of file