You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2018/12/10 14:01:29 UTC

carbondata git commit: [CARBONDATA-3145] Avoid duplicate decoding for complex column pages while querying

Repository: carbondata
Updated Branches:
  refs/heads/master 4c9f08217 -> 0c94559e2


[CARBONDATA-3145] Avoid duplicate decoding for complex column pages while querying

Problem:
Column page is decoded for getting each row of a complex primitive column.

Solution:
Decode a page it once then use the same.

This closes #2975


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/0c94559e
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/0c94559e
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/0c94559e

Branch: refs/heads/master
Commit: 0c94559e2feaf3d5a001665c3da2bfc3bf941043
Parents: 4c9f082
Author: dhatchayani <dh...@gmail.com>
Authored: Wed Dec 5 12:40:56 2018 +0530
Committer: kumarvishal09 <ku...@gmail.com>
Committed: Mon Dec 10 19:31:12 2018 +0530

----------------------------------------------------------------------
 .../core/scan/complextypes/ArrayQueryType.java  | 11 ++--
 .../scan/complextypes/ComplexQueryType.java     | 14 +++-
 .../scan/complextypes/PrimitiveQueryType.java   | 11 ++--
 .../core/scan/complextypes/StructQueryType.java | 14 ++--
 .../core/scan/filter/GenericQueryType.java      |  4 +-
 .../executer/RowLevelFilterExecuterImpl.java    |  7 +-
 .../core/scan/result/BlockletScannedResult.java | 68 +++++++++++++-------
 7 files changed, 86 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c94559e/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryType.java b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryType.java
index a5f4234..8538edb 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryType.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryType.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Map;
 
+import org.apache.carbondata.core.datastore.chunk.DimensionColumnPage;
 import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.scan.filter.GenericQueryType;
@@ -62,17 +63,17 @@ public class ArrayQueryType extends ComplexQueryType implements GenericQueryType
   }
 
   public void parseBlocksAndReturnComplexColumnByteArray(DimensionRawColumnChunk[] rawColumnChunks,
-      int rowNumber, int pageNumber, DataOutputStream dataOutputStream) throws IOException {
-    byte[] input = copyBlockDataChunk(rawColumnChunks, rowNumber, pageNumber);
+      DimensionColumnPage[][] dimensionColumnPages, int rowNumber, int pageNumber,
+      DataOutputStream dataOutputStream) throws IOException {
+    byte[] input = copyBlockDataChunk(rawColumnChunks, dimensionColumnPages, rowNumber, pageNumber);
     ByteBuffer byteArray = ByteBuffer.wrap(input);
     int dataLength = byteArray.getInt();
     dataOutputStream.writeInt(dataLength);
     if (dataLength > 0) {
       int dataOffset = byteArray.getInt();
       for (int i = 0; i < dataLength; i++) {
-        children
-            .parseBlocksAndReturnComplexColumnByteArray(rawColumnChunks, dataOffset++, pageNumber,
-                dataOutputStream);
+        children.parseBlocksAndReturnComplexColumnByteArray(rawColumnChunks, dimensionColumnPages,
+            dataOffset++, pageNumber, dataOutputStream);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c94559e/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ComplexQueryType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ComplexQueryType.java b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ComplexQueryType.java
index 98f0715..704af89 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ComplexQueryType.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ComplexQueryType.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.core.scan.complextypes;
 
 import java.io.IOException;
 
+import org.apache.carbondata.core.datastore.chunk.DimensionColumnPage;
 import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.scan.processor.RawBlockletColumnChunks;
 
@@ -40,9 +41,10 @@ public class ComplexQueryType {
    * This method is also used by child.
    */
   protected byte[] copyBlockDataChunk(DimensionRawColumnChunk[] rawColumnChunks,
-      int rowNumber, int pageNumber) {
+      DimensionColumnPage[][] dimensionColumnPages, int rowNumber, int pageNumber) {
     byte[] data =
-        rawColumnChunks[blockIndex].decodeColumnPage(pageNumber).getChunkData(rowNumber);
+        getDecodedDimensionPage(dimensionColumnPages, rawColumnChunks[blockIndex], pageNumber)
+            .getChunkData(rowNumber);
     byte[] output = new byte[data.length];
     System.arraycopy(data, 0, output, 0, output.length);
     return output;
@@ -57,4 +59,12 @@ public class ComplexQueryType {
           .readDimensionChunk(blockChunkHolder.getFileReader(), blockIndex);
     }
   }
+
+  private DimensionColumnPage getDecodedDimensionPage(DimensionColumnPage[][] dimensionColumnPages,
+      DimensionRawColumnChunk dimensionRawColumnChunk, int pageNumber) {
+    if (dimensionColumnPages == null || null == dimensionColumnPages[blockIndex]) {
+      return dimensionRawColumnChunk.decodeColumnPage(pageNumber);
+    }
+    return dimensionColumnPages[blockIndex][pageNumber];
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c94559e/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java
index abe33c4..6347397 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java
@@ -24,6 +24,7 @@ import java.util.Map;
 
 import org.apache.carbondata.core.cache.dictionary.Dictionary;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.chunk.DimensionColumnPage;
 import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
@@ -93,10 +94,12 @@ public class PrimitiveQueryType extends ComplexQueryType implements GenericQuery
     return 1;
   }
 
-  @Override public void parseBlocksAndReturnComplexColumnByteArray(
-      DimensionRawColumnChunk[] rawColumnChunks, int rowNumber,
-      int pageNumber, DataOutputStream dataOutputStream) throws IOException {
-    byte[] currentVal = copyBlockDataChunk(rawColumnChunks, rowNumber, pageNumber);
+  @Override
+  public void parseBlocksAndReturnComplexColumnByteArray(DimensionRawColumnChunk[] rawColumnChunks,
+      DimensionColumnPage[][] dimensionColumnPages, int rowNumber, int pageNumber,
+      DataOutputStream dataOutputStream) throws IOException {
+    byte[] currentVal =
+        copyBlockDataChunk(rawColumnChunks, dimensionColumnPages, rowNumber, pageNumber);
     if (!this.isDictionary && !this.isDirectDictionary) {
       dataOutputStream.writeShort(currentVal.length);
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c94559e/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java
index c607f84..7bccbc0 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.carbondata.core.datastore.chunk.DimensionColumnPage;
 import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.scan.filter.GenericQueryType;
@@ -79,17 +80,18 @@ public class StructQueryType extends ComplexQueryType implements GenericQueryTyp
   }
 
   @Override public void parseBlocksAndReturnComplexColumnByteArray(
-      DimensionRawColumnChunk[] dimensionColumnDataChunks, int rowNumber,
-      int pageNumber, DataOutputStream dataOutputStream) throws IOException {
-    byte[] input = copyBlockDataChunk(dimensionColumnDataChunks, rowNumber, pageNumber);
+      DimensionRawColumnChunk[] dimensionColumnDataChunks,
+      DimensionColumnPage[][] dimensionColumnPages, int rowNumber, int pageNumber,
+      DataOutputStream dataOutputStream) throws IOException {
+    byte[] input =
+        copyBlockDataChunk(dimensionColumnDataChunks, dimensionColumnPages, rowNumber, pageNumber);
     ByteBuffer byteArray = ByteBuffer.wrap(input);
     int childElement = byteArray.getShort();
     dataOutputStream.writeShort(childElement);
     if (childElement > 0) {
       for (int i = 0; i < childElement; i++) {
-        children.get(i)
-            .parseBlocksAndReturnComplexColumnByteArray(dimensionColumnDataChunks, rowNumber,
-                pageNumber, dataOutputStream);
+        children.get(i).parseBlocksAndReturnComplexColumnByteArray(dimensionColumnDataChunks,
+            dimensionColumnPages, rowNumber, pageNumber, dataOutputStream);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c94559e/core/src/main/java/org/apache/carbondata/core/scan/filter/GenericQueryType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/GenericQueryType.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/GenericQueryType.java
index 6c087d7..b43062e 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/GenericQueryType.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/GenericQueryType.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Map;
 
+import org.apache.carbondata.core.datastore.chunk.DimensionColumnPage;
 import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.scan.processor.RawBlockletColumnChunks;
@@ -41,7 +42,8 @@ public interface GenericQueryType {
   int getColsCount();
 
   void parseBlocksAndReturnComplexColumnByteArray(DimensionRawColumnChunk[] rawColumnChunks,
-      int rowNumber, int pageNumber, DataOutputStream dataOutputStream) throws IOException;
+      DimensionColumnPage[][] dimensionColumnPages, int rowNumber, int pageNumber,
+      DataOutputStream dataOutputStream) throws IOException;
 
   void fillRequiredBlockData(RawBlockletColumnChunks blockChunkHolder) throws IOException;
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c94559e/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
index 7ca2579..63ae0cd 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
@@ -457,9 +457,10 @@ public class RowLevelFilterExecuterImpl implements FilterExecuter {
           ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
           DataOutputStream dataOutputStream = new DataOutputStream(byteStream);
           complexType.parseBlocksAndReturnComplexColumnByteArray(
-              blockChunkHolder.getDimensionRawColumnChunks(), index, pageIndex, dataOutputStream);
-          record[dimColumnEvaluatorInfo.getRowIndex()] = complexType
-              .getDataBasedOnDataType(ByteBuffer.wrap(byteStream.toByteArray()));
+              blockChunkHolder.getDimensionRawColumnChunks(), null, index, pageIndex,
+              dataOutputStream);
+          record[dimColumnEvaluatorInfo.getRowIndex()] =
+              complexType.getDataBasedOnDataType(ByteBuffer.wrap(byteStream.toByteArray()));
           byteStream.close();
         } catch (IOException e) {
           LOGGER.info(e.getMessage());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c94559e/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java b/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
index bb373eb..c04df52 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
@@ -17,7 +17,6 @@
 package org.apache.carbondata.core.scan.result;
 
 import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
@@ -46,6 +45,7 @@ import org.apache.carbondata.core.stats.QueryStatistic;
 import org.apache.carbondata.core.stats.QueryStatisticsConstants;
 import org.apache.carbondata.core.stats.QueryStatisticsModel;
 import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.ReUsableByteArrayDataOutputStream;
 
 import org.apache.log4j.Logger;
 
@@ -282,30 +282,38 @@ public abstract class BlockletScannedResult {
   }
 
   public void fillColumnarComplexBatch(ColumnVectorInfo[] vectorInfos) {
+    ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
+    ReUsableByteArrayDataOutputStream reuseableDataOutput =
+        new ReUsableByteArrayDataOutputStream(byteStream);
+    boolean isExceptionThrown = false;
     for (int i = 0; i < vectorInfos.length; i++) {
       int offset = vectorInfos[i].offset;
       int len = offset + vectorInfos[i].size;
       int vectorOffset = vectorInfos[i].vectorOffset;
       CarbonColumnVector vector = vectorInfos[i].vector;
       for (int j = offset; j < len; j++) {
-        ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
-        DataOutputStream dataOutput = new DataOutputStream(byteStream);
         try {
-          vectorInfos[i].genericQueryType.parseBlocksAndReturnComplexColumnByteArray(
-              dimRawColumnChunks,
-              pageFilteredRowId == null ? j : pageFilteredRowId[pageCounter][j], pageCounter,
-              dataOutput);
+          vectorInfos[i].genericQueryType
+              .parseBlocksAndReturnComplexColumnByteArray(dimRawColumnChunks, dimensionColumnPages,
+                  pageFilteredRowId == null ? j : pageFilteredRowId[pageCounter][j], pageCounter,
+                  reuseableDataOutput);
           Object data = vectorInfos[i].genericQueryType
-              .getDataBasedOnDataType(ByteBuffer.wrap(byteStream.toByteArray()));
+              .getDataBasedOnDataType(ByteBuffer.wrap(reuseableDataOutput.getByteArray()));
           vector.putObject(vectorOffset++, data);
+          reuseableDataOutput.reset();
         } catch (IOException e) {
+          isExceptionThrown = true;
           LOGGER.error(e);
         } finally {
-          CarbonUtil.closeStreams(dataOutput);
-          CarbonUtil.closeStreams(byteStream);
+          if (isExceptionThrown) {
+            CarbonUtil.closeStreams(reuseableDataOutput);
+            CarbonUtil.closeStreams(byteStream);
+          }
         }
       }
     }
+    CarbonUtil.closeStreams(reuseableDataOutput);
+    CarbonUtil.closeStreams(byteStream);
   }
 
   /**
@@ -541,6 +549,10 @@ public abstract class BlockletScannedResult {
    */
   protected List<byte[][]> getComplexTypeKeyArrayBatch() {
     List<byte[][]> complexTypeArrayList = new ArrayList<>(validRowIds.size());
+    ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
+    ReUsableByteArrayDataOutputStream reUseableDataOutput =
+        new ReUsableByteArrayDataOutputStream(byteStream);
+    boolean isExceptionThrown = false;
     byte[][] complexTypeData = null;
     // everyTime it is initialized new as in case of prefetch it can modify the data
     for (int i = 0; i < validRowIds.size(); i++) {
@@ -552,23 +564,27 @@ public abstract class BlockletScannedResult {
       GenericQueryType genericQueryType =
           complexParentIndexToQueryMap.get(complexParentBlockIndexes[i]);
       for (int j = 0; j < validRowIds.size(); j++) {
-        ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
-        DataOutputStream dataOutput = new DataOutputStream(byteStream);
         try {
           genericQueryType
-              .parseBlocksAndReturnComplexColumnByteArray(dimRawColumnChunks, validRowIds.get(j),
-                  pageCounter, dataOutput);
+              .parseBlocksAndReturnComplexColumnByteArray(dimRawColumnChunks, dimensionColumnPages,
+                  validRowIds.get(j), pageCounter, reUseableDataOutput);
           // get the key array in columnar way
           byte[][] complexKeyArray = complexTypeArrayList.get(j);
           complexKeyArray[i] = byteStream.toByteArray();
+          reUseableDataOutput.reset();
         } catch (IOException e) {
+          isExceptionThrown = true;
           LOGGER.error(e);
         } finally {
-          CarbonUtil.closeStreams(dataOutput);
-          CarbonUtil.closeStreams(byteStream);
+          if (isExceptionThrown) {
+            CarbonUtil.closeStreams(reUseableDataOutput);
+            CarbonUtil.closeStreams(byteStream);
+          }
         }
       }
     }
+    CarbonUtil.closeStreams(reUseableDataOutput);
+    CarbonUtil.closeStreams(byteStream);
     return complexTypeArrayList;
   }
 
@@ -607,24 +623,32 @@ public abstract class BlockletScannedResult {
    * @return complex type key array for all the complex dimension selected in query
    */
   protected byte[][] getComplexTypeKeyArray(int rowId) {
+    ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
+    ReUsableByteArrayDataOutputStream reUsableDataOutput =
+        new ReUsableByteArrayDataOutputStream(byteStream);
+    boolean isExceptionThrown = false;
     byte[][] complexTypeData = new byte[complexParentBlockIndexes.length][];
     for (int i = 0; i < complexTypeData.length; i++) {
       GenericQueryType genericQueryType =
           complexParentIndexToQueryMap.get(complexParentBlockIndexes[i]);
-      ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
-      DataOutputStream dataOutput = new DataOutputStream(byteStream);
       try {
         genericQueryType
-            .parseBlocksAndReturnComplexColumnByteArray(dimRawColumnChunks, rowId, pageCounter,
-                dataOutput);
+            .parseBlocksAndReturnComplexColumnByteArray(dimRawColumnChunks, dimensionColumnPages,
+                rowId, pageCounter, reUsableDataOutput);
         complexTypeData[i] = byteStream.toByteArray();
+        reUsableDataOutput.reset();
       } catch (IOException e) {
+        isExceptionThrown = true;
         LOGGER.error(e);
       } finally {
-        CarbonUtil.closeStreams(dataOutput);
-        CarbonUtil.closeStreams(byteStream);
+        if (isExceptionThrown) {
+          CarbonUtil.closeStreams(reUsableDataOutput);
+          CarbonUtil.closeStreams(byteStream);
+        }
       }
     }
+    CarbonUtil.closeStreams(reUsableDataOutput);
+    CarbonUtil.closeStreams(byteStream);
     return complexTypeData;
   }