You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/11/21 17:59:29 UTC
[02/50] [abbrv] carbondata git commit: [CARBONDATA-3056] Added
concurrent reading through SDK
[CARBONDATA-3056] Added concurrent reading through SDK
Added an API CarbonReader.split to enable concurrent reading of carbondata files through SDK.
This closes #2850
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/b9720d36
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/b9720d36
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/b9720d36
Branch: refs/heads/branch-1.5
Commit: b9720d361560c8224bf6caaa1af6de8a45cf2baf
Parents: e98e09a
Author: Naman Rastogi <na...@gmail.com>
Authored: Thu Oct 18 18:24:23 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Wed Nov 21 22:37:26 2018 +0530
----------------------------------------------------------------------
docs/sdk-guide.md | 20 +++
.../carbondata/sdk/file/CarbonReader.java | 52 ++++++
.../sdk/file/ConcurrentSdkReaderTest.java | 159 +++++++++++++++++++
3 files changed, 231 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b9720d36/docs/sdk-guide.md
----------------------------------------------------------------------
diff --git a/docs/sdk-guide.md b/docs/sdk-guide.md
index 0ee1524..cb34627 100644
--- a/docs/sdk-guide.md
+++ b/docs/sdk-guide.md
@@ -591,6 +591,26 @@ Find example code at [CarbonReaderExample](https://github.com/apache/carbondata/
```
```
+/**
+ * Breaks the list of CarbonRecordReader in CarbonReader into multiple
+ * CarbonReader objects, each iterating through some 'carbondata' files
+ * and return that list of CarbonReader objects
+ *
+ * If the no. of files is greater than maxSplits, then break the
+ * CarbonReader into maxSplits splits, with each split iterating
+ * through >= 1 file.
+ *
+ * If the no. of files is less than maxSplits, then return list of
+ * CarbonReader with size as the no. of files, with each CarbonReader
+ * iterating through exactly one file
+ *
+ * @param maxSplits: Int
+ * @return list of CarbonReader objects
+ */
+ public List<CarbonReader> split(int maxSplits);
+``
+
+```
/**
* Return true if has next row
*/
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b9720d36/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java
index a381429..1a55a2e 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java
@@ -18,6 +18,7 @@
package org.apache.carbondata.sdk.file;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
@@ -115,6 +116,57 @@ public class CarbonReader<T> {
}
/**
+ * Breaks the list of CarbonRecordReader in CarbonReader into multiple
+ * CarbonReader objects, each iterating through some 'carbondata' files
+ * and return that list of CarbonReader objects
+ *
+ * If the no. of files is greater than maxSplits, then break the
+ * CarbonReader into maxSplits splits, with each split iterating
+ * through >= 1 file.
+ *
+ * If the no. of files is less than maxSplits, then return list of
+ * CarbonReader with size as the no. of files, with each CarbonReader
+ * iterating through exactly one file
+ *
+ * @param maxSplits: Int
+ * @return list of {@link CarbonReader} objects
+ */
+ public List<CarbonReader> split(int maxSplits) throws IOException {
+ validateReader();
+ if (maxSplits < 1) {
+ throw new RuntimeException(
+ this.getClass().getSimpleName() + ".split: maxSplits must be positive");
+ }
+
+ List<CarbonReader> carbonReaders = new ArrayList<>();
+
+ if (maxSplits < this.readers.size()) {
+ // If maxSplits is less than the no. of files
+ // Split the reader into maxSplits splits with each
+ // element containing >= 1 CarbonRecordReader objects
+ float filesPerSplit = (float) this.readers.size() / maxSplits;
+ for (int i = 0; i < maxSplits; ++i) {
+ carbonReaders.add(new CarbonReader<>(this.readers.subList(
+ (int) Math.ceil(i * filesPerSplit),
+ (int) Math.ceil(((i + 1) * filesPerSplit)))));
+ }
+ } else {
+ // If maxSplits is greater than the no. of files
+ // Split the reader into <num_files> splits with each
+ // element contains exactly 1 CarbonRecordReader object
+ for (int i = 0; i < this.readers.size(); ++i) {
+ carbonReaders.add(new CarbonReader<>(this.readers.subList(i, i + 1)));
+ }
+ }
+
+ // This is to disable the use of this CarbonReader object to iterate
+ // over the files and forces user to only use the returned splits
+ this.initialise = false;
+
+ return carbonReaders;
+ }
+
+ /**
* Close reader
*
* @throws IOException
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b9720d36/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentSdkReaderTest.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentSdkReaderTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentSdkReaderTest.java
new file mode 100644
index 0000000..fef3319
--- /dev/null
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentSdkReaderTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.sdk.file;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.*;
+
+/**
+ * multi-thread Test suite for {@link CarbonReader}
+ */
+public class ConcurrentSdkReaderTest {
+
+ private static final String dataDir = "./testReadFiles";
+
+ @Before @After public void cleanTestData() {
+ try {
+ FileUtils.deleteDirectory(new File(dataDir));
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ private void writeDataMultipleFiles(int numFiles, long numRowsPerFile) {
+ Field[] fields = new Field[2];
+ fields[0] = new Field("stringField", DataTypes.STRING);
+ fields[1] = new Field("intField", DataTypes.INT);
+
+ for (int numFile = 0; numFile < numFiles; ++numFile) {
+ CarbonWriterBuilder builder =
+ CarbonWriter.builder().outputPath(dataDir).withCsvInput(new Schema(fields))
+ .writtenBy("ConcurrentSdkReaderTest");
+
+ try {
+ CarbonWriter writer = builder.build();
+
+ for (long i = 0; i < numRowsPerFile; ++i) {
+ writer.write(new String[] { "robot_" + i, String.valueOf(i) });
+ }
+ writer.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+ }
+
+ @Test public void testReadParallely() throws IOException, InterruptedException {
+ int numFiles = 10;
+ int numRowsPerFile = 10;
+ short numThreads = 4;
+ writeDataMultipleFiles(numFiles, numRowsPerFile);
+ long count;
+
+ // Sequential Reading
+ CarbonReader reader = CarbonReader.builder(dataDir).build();
+ try {
+ count = 0;
+ long start = System.currentTimeMillis();
+ while (reader.hasNext()) {
+ reader.readNextRow();
+ count += 1;
+ }
+ long end = System.currentTimeMillis();
+ System.out.println("[Sequential read] Time: " + (end - start) + " ms");
+ Assert.assertEquals(numFiles * numRowsPerFile, count);
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ } finally {
+ reader.close();
+ }
+
+ // Concurrent Reading
+ ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
+ try {
+ CarbonReader reader2 = CarbonReader.builder(dataDir).build();
+ List<CarbonReader> multipleReaders = reader2.split(numThreads);
+ try {
+ List<ReadLogic> tasks = new ArrayList<>();
+ List<Future<Long>> results;
+ count = 0;
+
+ for (CarbonReader reader_i : multipleReaders) {
+ tasks.add(new ReadLogic(reader_i));
+ }
+ long start = System.currentTimeMillis();
+ results = executorService.invokeAll(tasks);
+ for (Future result_i : results) {
+ count += (long) result_i.get();
+ }
+ long end = System.currentTimeMillis();
+ System.out.println("[Parallel read] Time: " + (end - start) + " ms");
+ Assert.assertEquals(numFiles * numRowsPerFile, count);
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ } finally {
+ executorService.shutdown();
+ executorService.awaitTermination(10, TimeUnit.MINUTES);
+ }
+ }
+
+ class ReadLogic implements Callable<Long> {
+ CarbonReader reader;
+
+ ReadLogic(CarbonReader reader) {
+ this.reader = reader;
+ }
+
+ @Override public Long call() throws IOException {
+ long count = 0;
+ try {
+ while (reader.hasNext()) {
+ reader.readNextRow();
+ count += 1;
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ } finally {
+ reader.close();
+ }
+ return count;
+ }
+ }
+
+}
\ No newline at end of file