You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by xu...@apache.org on 2019/05/20 12:28:38 UTC

[carbondata] branch master updated: [CARBONDATA-3366] Support SDK reader to read blocklet level split

This is an automated email from the ASF dual-hosted git repository.

xubo245 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new c2d4b3e  [CARBONDATA-3366] Support SDK reader to read blocklet level split
c2d4b3e is described below

commit c2d4b3e0a012f7b2f0f5d66273ba2637fb361b73
Author: ajantha-bhat <aj...@gmail.com>
AuthorDate: Fri Mar 15 10:28:14 2019 +0800

    [CARBONDATA-3366] Support SDK reader to read blocklet level split
    
    To provide more flexibility in SDK reader, blocklet level read support for carbondata files from SDK reader is required.
    With this, SDK reader can be used in distributed environment or in multi thread environment by creating carbon readers in each worker at split level (blocklet split).
    For this in CarbonReaderBuilder new interface is added.
    
    This closes #3196
---
 .../LatestFilesReadCommittedScope.java             |   4 +
 .../core/readcommitter/ReadCommittedScope.java     |   8 +
 .../TableStatusReadCommittedScope.java             |   4 +
 .../apache/carbondata/sdk/file/CarbonReader.java   |  11 ++
 .../carbondata/sdk/file/CarbonReaderBuilder.java   | 169 +++++++++++++++------
 .../carbondata/sdk/file/CarbonReaderTest.java      |  33 ++++
 .../sdk/file/MultithreadSDKBlockletReaderTest.java | 144 ++++++++++++++++++
 7 files changed, 326 insertions(+), 47 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java b/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
index 7817cc8..07db7d0 100644
--- a/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
+++ b/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
@@ -247,4 +247,8 @@ public class LatestFilesReadCommittedScope implements ReadCommittedScope {
   @Override public void setConfiguration(Configuration configuration) {
     this.configuration = configuration;
   }
+
+  @Override public String getFilePath() {
+    return carbonFilePath;
+  }
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedScope.java b/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedScope.java
index aea7e97..2c3518f 100644
--- a/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedScope.java
+++ b/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedScope.java
@@ -54,4 +54,12 @@ import org.apache.hadoop.conf.Configuration;
   Configuration getConfiguration();
 
   void setConfiguration(Configuration configuration);
+
+  /**
+   * get table path if ReadCommittedScope is TableStatusReadCommittedScope
+   * get file path if ReadCommittedScope is LatestFilesReadCommittedScope
+   *
+   * @return
+   */
+  String getFilePath();
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java b/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
index ac0d156..5622efe 100644
--- a/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
+++ b/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
@@ -112,4 +112,8 @@ public class TableStatusReadCommittedScope implements ReadCommittedScope {
   @Override public void setConfiguration(Configuration configuration) {
     this.configuration = configuration;
   }
+
+  @Override public String getFilePath() {
+    return identifier.getTablePath();
+  }
 }
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java
index 1fa4e62..6ed6624 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java
@@ -29,6 +29,7 @@ import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.hadoop.CarbonRecordReader;
 import org.apache.carbondata.hadoop.util.CarbonVectorizedRecordReader;
 
+import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;
 
 
@@ -150,6 +151,16 @@ public class CarbonReader<T> {
   /**
    * Return a new {@link CarbonReaderBuilder} instance
    *
+   * @param inputSplit CarbonInputSplit Object
+   * @return CarbonReaderBuilder object
+   */
+  public static CarbonReaderBuilder builder(InputSplit inputSplit) {
+    return new CarbonReaderBuilder(inputSplit);
+  }
+
+  /**
+   * Return a new {@link CarbonReaderBuilder} instance
+   *
    * @param tablePath table path
    * @return CarbonReaderBuilder object
    */
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
index d35957a..ef79b05 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
@@ -20,7 +20,9 @@ package org.apache.carbondata.sdk.file;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
+import java.util.UUID;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.annotations.InterfaceStability;
@@ -35,10 +37,12 @@ import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonSessionInfo;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
 import org.apache.carbondata.hadoop.api.CarbonFileInputFormat;
 import org.apache.carbondata.hadoop.util.CarbonVectorizedRecordReader;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobID;
@@ -57,6 +61,7 @@ public class CarbonReaderBuilder {
   private String tableName;
   private Configuration hadoopConf;
   private boolean useVectorReader = true;
+  private InputSplit inputSplit;
   private boolean useArrowReader;
 
   /**
@@ -71,6 +76,10 @@ public class CarbonReaderBuilder {
     ThreadLocalSessionInfo.setCarbonSessionInfo(new CarbonSessionInfo());
   }
 
+  CarbonReaderBuilder(InputSplit inputSplit) {
+    this.inputSplit = inputSplit;
+    ThreadLocalSessionInfo.setCarbonSessionInfo(new CarbonSessionInfo());
+  }
 
   /**
    * Configure the projection column names of carbon reader
@@ -147,7 +156,6 @@ public class CarbonReaderBuilder {
   public CarbonReaderBuilder withHadoopConf(String key, String value) {
     if (this.hadoopConf == null) {
       this.hadoopConf = new Configuration();
-
     }
     this.hadoopConf.set(key, value);
     return this;
@@ -175,24 +183,22 @@ public class CarbonReaderBuilder {
     return (ArrowCarbonReader<T>) this.build();
   }
 
-  /**
-   * Build CarbonReader
-   *
-   * @param <T>
-   * @return CarbonReader
-   * @throws IOException
-   * @throws InterruptedException
-   */
-  public <T> CarbonReader<T> build()
-      throws IOException, InterruptedException {
-    if (hadoopConf == null) {
-      hadoopConf = FileFactory.getConfiguration();
+  private CarbonFileInputFormat prepareFileInputFormat(Job job, boolean enableBlockletDistribution,
+      boolean disableLoadBlockDataMap) throws IOException {
+    if (inputSplit != null && inputSplit instanceof CarbonInputSplit) {
+      tablePath =
+          ((CarbonInputSplit) inputSplit).getSegment().getReadCommittedScope().getFilePath();
+      tableName = "UnknownTable" + UUID.randomUUID();
+    }
+    CarbonTable table = CarbonTable.buildTable(tablePath, tableName, hadoopConf);
+    if (enableBlockletDistribution) {
+      // set cache level to blocklet level
+      Map<String, String> tableProperties =
+          table.getTableInfo().getFactTable().getTableProperties();
+      tableProperties.put(CarbonCommonConstants.CACHE_LEVEL,"BLOCKLET");
+      table.getTableInfo().getFactTable().setTableProperties(tableProperties);
     }
-    CarbonTable table;
-    // now always infer schema. TODO:Refactor in next version.
-    table = CarbonTable.buildTable(tablePath, tableName, hadoopConf);
     final CarbonFileInputFormat format = new CarbonFileInputFormat();
-    final Job job = new Job(hadoopConf);
     format.setTableInfo(job.getConfiguration(), table.getTableInfo());
     format.setTablePath(job.getConfiguration(), table.getTablePath());
     format.setTableName(job.getConfiguration(), table.getTableName());
@@ -200,11 +206,9 @@ public class CarbonReaderBuilder {
     if (filterExpression != null) {
       format.setFilterPredicates(job.getConfiguration(), filterExpression);
     }
-
     if (projectionColumns != null) {
       // set the user projection
       int len = projectionColumns.length;
-      //      TODO : Handle projection of complex child columns
       for (int i = 0; i < len; i++) {
         if (projectionColumns[i].contains(".")) {
           throw new UnsupportedOperationException(
@@ -213,40 +217,66 @@ public class CarbonReaderBuilder {
       }
       format.setColumnProjection(job.getConfiguration(), projectionColumns);
     }
+    if ((disableLoadBlockDataMap) && (filterExpression == null)) {
+      job.getConfiguration().set("filter_blocks", "false");
+    }
+    return format;
+  }
 
+  private <T> RecordReader getRecordReader(Job job, CarbonFileInputFormat format,
+      List<RecordReader<Void, T>> readers, InputSplit split)
+      throws IOException, InterruptedException {
+    TaskAttemptContextImpl attempt =
+        new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
+    RecordReader reader;
+    QueryModel queryModel = format.createQueryModel(split, attempt);
+    boolean hasComplex = false;
+    for (ProjectionDimension projectionDimension : queryModel.getProjectionDimensions()) {
+      if (projectionDimension.getDimension().isComplex()) {
+        hasComplex = true;
+        break;
+      }
+    }
+    if (useVectorReader && !hasComplex) {
+      queryModel.setDirectVectorFill(filterExpression == null);
+      reader = new CarbonVectorizedRecordReader(queryModel);
+    } else {
+      reader = format.createRecordReader(split, attempt);
+    }
     try {
+      reader.initialize(split, attempt);
+    } catch (Exception e) {
+      CarbonUtil.closeStreams(readers.toArray(new RecordReader[0]));
+      throw e;
+    }
+    return reader;
+  }
 
-      if (filterExpression == null) {
-        job.getConfiguration().set("filter_blocks", "false");
-      }
+  /**
+   * Build CarbonReader
+   *
+   * @param <T>
+   * @return CarbonReader
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public <T> CarbonReader<T> build()
+      throws IOException, InterruptedException {
+    if (inputSplit != null) {
+      return buildWithSplits(inputSplit);
+    }
+    if (hadoopConf == null) {
+      hadoopConf = FileFactory.getConfiguration();
+    }
+    final Job job = new Job(new JobConf(hadoopConf));
+    CarbonFileInputFormat format = prepareFileInputFormat(job, false, true);
+    try {
       List<InputSplit> splits =
           format.getSplits(new JobContextImpl(job.getConfiguration(), new JobID()));
       List<RecordReader<Void, T>> readers = new ArrayList<>(splits.size());
       for (InputSplit split : splits) {
-        TaskAttemptContextImpl attempt =
-            new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
-        RecordReader reader;
-        QueryModel queryModel = format.createQueryModel(split, attempt);
-        boolean hasComplex = false;
-        for (ProjectionDimension projectionDimension : queryModel.getProjectionDimensions()) {
-          if (projectionDimension.getDimension().isComplex()) {
-            hasComplex = true;
-            break;
-          }
-        }
-        if (useVectorReader && !hasComplex) {
-          queryModel.setDirectVectorFill(filterExpression == null);
-          reader = new CarbonVectorizedRecordReader(queryModel);
-        } else {
-          reader = format.createRecordReader(split, attempt);
-        }
-        try {
-          reader.initialize(split, attempt);
-          readers.add(reader);
-        } catch (Exception e) {
-          CarbonUtil.closeStreams(readers.toArray(new RecordReader[0]));
-          throw e;
-        }
+        RecordReader reader = getRecordReader(job, format, readers, split);
+        readers.add(reader);
       }
       if (useArrowReader) {
         return new ArrowCarbonReader<>(readers);
@@ -256,9 +286,54 @@ public class CarbonReaderBuilder {
     } catch (Exception ex) {
       // Clear the datamap cache as it can get added in getSplits() method
       DataMapStoreManager.getInstance()
-          .clearDataMaps(table.getAbsoluteTableIdentifier());
+          .clearDataMaps(format.getAbsoluteTableIdentifier(hadoopConf));
       throw ex;
     }
   }
 
+
+  private  <T> CarbonReader<T> buildWithSplits(InputSplit inputSplit)
+      throws IOException, InterruptedException {
+    if (hadoopConf == null) {
+      hadoopConf = FileFactory.getConfiguration();
+    }
+    final Job job = new Job(new JobConf(hadoopConf));
+    CarbonFileInputFormat format = prepareFileInputFormat(job, false, true);
+    try {
+      List<RecordReader<Void, T>> readers = new ArrayList<>(1);
+      RecordReader reader = getRecordReader(job, format, readers, inputSplit);
+      readers.add(reader);
+      if (useArrowReader) {
+        return new ArrowCarbonReader<>(readers);
+      } else {
+        return new CarbonReader<>(readers);
+      }
+    } catch (Exception ex) {
+      // Clear the datamap cache as it can get added in getSplits() method
+      DataMapStoreManager.getInstance()
+          .clearDataMaps(format.getAbsoluteTableIdentifier(hadoopConf));
+      throw ex;
+    }
+  }
+
+  /**
+   * Gets an array of CarbonInputSplits.
+   * In carbondata, splits can be block level or blocklet level.
+   * by default splits are block level.
+   *
+   * @param enableBlockletDistribution, returns blocklet level splits if set to true,
+   *                                    else block level splits.
+   * @return
+   * @throws IOException
+   */
+  public InputSplit[] getSplits(boolean enableBlockletDistribution) throws IOException {
+    if (hadoopConf == null) {
+      hadoopConf = FileFactory.getConfiguration();
+    }
+    final Job job = new Job(new JobConf(hadoopConf));
+    CarbonFileInputFormat format = prepareFileInputFormat(job, enableBlockletDistribution, false);
+    List<InputSplit> splits =
+        format.getSplits(new JobContextImpl(job.getConfiguration(), new JobID()));
+    return splits.toArray(new InputSplit[splits.size()]);
+  }
 }
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
index 1607b8f..1073428 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
@@ -27,6 +27,7 @@ import org.apache.arrow.vector.Float4Vector;
 import org.apache.arrow.vector.SmallIntVector;
 import org.apache.arrow.vector.VectorSchemaRoot;
 import org.apache.avro.generic.GenericData;
+import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.log4j.Logger;
 
 import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
@@ -2553,4 +2554,36 @@ public class CarbonReaderTest extends TestCase {
     }
   }
 
+
+  @Test
+  public void testReadBlocklet() throws IOException, InterruptedException {
+    String path = "./testWriteFiles/" + System.nanoTime();
+    FileUtils.deleteDirectory(new File(path));
+
+    Field[] fields = new Field[2];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+
+    TestUtil.writeFilesAndVerify(1000 * 1000, new Schema(fields), path, null, 1, 100);
+
+    InputSplit[] splits = CarbonReader.builder(path).getSplits(true);
+    // check for 3 blocklet count (as only one carbon file will be created)
+    Assert.assertEquals(splits.length, 3);
+
+    int totalCount = 0;
+    for (int k = 0; k < splits.length; k++) {
+      CarbonReader reader = CarbonReader
+          .builder(splits[k])
+          .build();
+      int i = 0;
+      while (reader.hasNext()) {
+        Object[] row = (Object[]) reader.readNextRow();
+        i++;
+      }
+      totalCount += i;
+      reader.close();
+    }
+    Assert.assertEquals(totalCount, 1000000);
+    FileUtils.deleteDirectory(new File(path));
+  }
 }
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/MultithreadSDKBlockletReaderTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/MultithreadSDKBlockletReaderTest.java
new file mode 100644
index 0000000..fc6beef
--- /dev/null
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/MultithreadSDKBlockletReaderTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.sdk.file;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * multi-thread Test suite for {@link CarbonReader}
+ */
+public class MultithreadSDKBlockletReaderTest {
+
+  private static final String dataDir = "./testReadFiles";
+
+  @Before @After public void cleanTestData() {
+    try {
+      FileUtils.deleteDirectory(new File(dataDir));
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.fail(e.getMessage());
+    }
+  }
+
+  private void writeDataMultipleFiles(int numFiles, long numRowsPerFile) {
+    Field[] fields = new Field[2];
+    fields[0] = new Field("stringField", DataTypes.STRING);
+    fields[1] = new Field("intField", DataTypes.INT);
+
+    for (int numFile = 0; numFile < numFiles; ++numFile) {
+      CarbonWriterBuilder builder =
+          CarbonWriter.builder().outputPath(dataDir).withCsvInput(new Schema(fields))
+              .withBlockletSize(1).withBlockletSize(2).writtenBy("ConcurrentSdkReaderTest");
+      try {
+        CarbonWriter writer = builder.build();
+
+        for (long i = 0; i < numRowsPerFile; ++i) {
+          writer.write(new String[] { "robot_" + i, String.valueOf(i) });
+        }
+        writer.close();
+      } catch (Exception e) {
+        e.printStackTrace();
+        Assert.fail(e.getMessage());
+      }
+    }
+  }
+
+  @Test public void testReadParallely() throws IOException, InterruptedException {
+    int numFiles = 2;
+    int numRowsPerFile = 1000000;
+    short numThreads;
+    writeDataMultipleFiles(numFiles, numRowsPerFile);
+    long count;
+
+    InputSplit[] splits = CarbonReader.builder(dataDir).getSplits(true);
+    Assert.assertEquals(splits.length, 8);
+    numThreads = (short) splits.length;
+    // Concurrent Reading
+    ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
+    try {
+      try {
+        List<ReadLogic> tasks = new ArrayList<>();
+        List<Future<Long>> results;
+        count = 0;
+        for (InputSplit split : splits) {
+          tasks.add(new ReadLogic(split));
+        }
+        long start = System.currentTimeMillis();
+        results = executorService.invokeAll(tasks);
+        for (Future result_i : results) {
+          count += (long) result_i.get();
+        }
+        long end = System.currentTimeMillis();
+        System.out.println("[Parallel read] Time: " + (end - start) + " ms");
+        Assert.assertEquals(numFiles * numRowsPerFile, count);
+      } catch (Exception e) {
+        e.printStackTrace();
+        Assert.fail(e.getMessage());
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.fail(e.getMessage());
+    } finally {
+      executorService.shutdown();
+      executorService.awaitTermination(10, TimeUnit.MINUTES);
+    }
+  }
+
+  class ReadLogic implements Callable<Long> {
+    InputSplit split;
+
+    ReadLogic(InputSplit split) {
+      this.split = split;
+    }
+
+    @Override public Long call() throws IOException, InterruptedException {
+      long count = 0;
+      CarbonReader reader = CarbonReader.builder(split).build();
+      try {
+        while (reader.hasNext()) {
+          reader.readNextRow();
+          count += 1;
+        }
+      } catch (Exception e) {
+        e.printStackTrace();
+        Assert.fail(e.getMessage());
+      } finally {
+        reader.close();
+      }
+      return count;
+    }
+  }
+
+}
\ No newline at end of file