You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/03/04 12:24:26 UTC
[06/50] [abbrv] carbondata git commit: [CARBONDATA-2099] Refactor
query scan process to improve readability
http://git-wip-us.apache.org/repos/asf/carbondata/blob/55c4e438/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java b/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
index f4450e3..5f8d199 100644
--- a/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
@@ -31,7 +31,7 @@ import java.util.Map;
import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datastore.block.TableBlockInfo;
-import org.apache.carbondata.core.datastore.chunk.impl.FixedLengthDimensionDataChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.FixedLengthDimensionColumnPage;
import org.apache.carbondata.core.datastore.columnar.ColumnGroupModel;
import org.apache.carbondata.core.datastore.filesystem.LocalCarbonFile;
import org.apache.carbondata.core.datastore.impl.FileFactory;
@@ -45,7 +45,7 @@ import org.apache.carbondata.core.metadata.encoder.Encoding;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
-import org.apache.carbondata.core.scan.model.QueryDimension;
+import org.apache.carbondata.core.scan.model.ProjectionDimension;
import mockit.Mock;
import mockit.MockUp;
@@ -267,8 +267,8 @@ public class CarbonUtilTest {
@Test public void testToGetNextLesserValue() {
byte[] dataChunks = { 5, 6, 7, 8, 9 };
byte[] compareValues = { 7 };
- FixedLengthDimensionDataChunk fixedLengthDataChunk =
- new FixedLengthDimensionDataChunk(dataChunks, null, null, 5, 1);
+ FixedLengthDimensionColumnPage fixedLengthDataChunk =
+ new FixedLengthDimensionColumnPage(dataChunks, null, null, 5, 1);
int result = CarbonUtil.nextLesserValueToTarget(2, fixedLengthDataChunk, compareValues);
assertEquals(result, 1);
}
@@ -276,8 +276,8 @@ public class CarbonUtilTest {
@Test public void testToGetNextLesserValueToTarget() {
byte[] dataChunks = { 7, 7, 7, 8, 9 };
byte[] compareValues = { 7 };
- FixedLengthDimensionDataChunk fixedLengthDataChunk =
- new FixedLengthDimensionDataChunk(dataChunks, null, null, 5, 1);
+ FixedLengthDimensionColumnPage fixedLengthDataChunk =
+ new FixedLengthDimensionColumnPage(dataChunks, null, null, 5, 1);
int result = CarbonUtil.nextLesserValueToTarget(2, fixedLengthDataChunk, compareValues);
assertEquals(result, -1);
}
@@ -285,8 +285,8 @@ public class CarbonUtilTest {
@Test public void testToGetnextGreaterValue() {
byte[] dataChunks = { 5, 6, 7, 8, 9 };
byte[] compareValues = { 7 };
- FixedLengthDimensionDataChunk fixedLengthDataChunk =
- new FixedLengthDimensionDataChunk(dataChunks, null, null, 5, 1);
+ FixedLengthDimensionColumnPage fixedLengthDataChunk =
+ new FixedLengthDimensionColumnPage(dataChunks, null, null, 5, 1);
int result = CarbonUtil.nextGreaterValueToTarget(2, fixedLengthDataChunk, compareValues, 5);
assertEquals(result, 3);
}
@@ -302,8 +302,8 @@ public class CarbonUtilTest {
@Test public void testToGetnextGreaterValueToTarget() {
byte[] dataChunks = { 5, 6, 7, 7, 7 };
byte[] compareValues = { 7 };
- FixedLengthDimensionDataChunk fixedLengthDataChunk =
- new FixedLengthDimensionDataChunk(dataChunks, null, null, 5, 1);
+ FixedLengthDimensionColumnPage fixedLengthDataChunk =
+ new FixedLengthDimensionColumnPage(dataChunks, null, null, 5, 1);
int result = CarbonUtil.nextGreaterValueToTarget(2, fixedLengthDataChunk, compareValues, 5);
assertEquals(result, 5);
}
@@ -525,23 +525,23 @@ public class CarbonUtilTest {
}
@Test public void testToGetDictionaryEncodingArray() {
- QueryDimension column1 = new QueryDimension("Column1");
- QueryDimension column2 = new QueryDimension("Column2");
ColumnSchema column1Schema = new ColumnSchema();
ColumnSchema column2Schema = new ColumnSchema();
column1Schema.setColumnName("Column1");
List<Encoding> encoding = new ArrayList<>();
encoding.add(Encoding.DICTIONARY);
column1Schema.setEncodingList(encoding);
- column1.setDimension(new CarbonDimension(column1Schema, 1, 1, 1, 1));
+ ProjectionDimension
+ column1 = new ProjectionDimension(new CarbonDimension(column1Schema, 1, 1, 1, 1));
column2Schema.setColumnName("Column2");
List<Encoding> encoding2 = new ArrayList<>();
encoding2.add(Encoding.DELTA);
column2Schema.setEncodingList(encoding2);
- column2.setDimension(new CarbonDimension(column2Schema, 1, 1, 1, 1));
+ ProjectionDimension
+ column2 = new ProjectionDimension(new CarbonDimension(column2Schema, 1, 1, 1, 1));
- QueryDimension[] queryDimensions = { column1, column2 };
+ ProjectionDimension[] queryDimensions = { column1, column2 };
boolean[] dictionaryEncoding = CarbonUtil.getDictionaryEncodingArray(queryDimensions);
boolean[] expectedDictionaryEncoding = { true, false };
@@ -551,23 +551,23 @@ public class CarbonUtilTest {
}
@Test public void testToGetDirectDictionaryEncodingArray() {
- QueryDimension column1 = new QueryDimension("Column1");
- QueryDimension column2 = new QueryDimension("Column2");
ColumnSchema column1Schema = new ColumnSchema();
ColumnSchema column2Schema = new ColumnSchema();
column1Schema.setColumnName("Column1");
List<Encoding> encoding = new ArrayList<>();
encoding.add(Encoding.DIRECT_DICTIONARY);
column1Schema.setEncodingList(encoding);
- column1.setDimension(new CarbonDimension(column1Schema, 1, 1, 1, 1));
+ ProjectionDimension
+ column1 = new ProjectionDimension(new CarbonDimension(column1Schema, 1, 1, 1, 1));
column2Schema.setColumnName("Column2");
List<Encoding> encoding2 = new ArrayList<>();
encoding2.add(Encoding.DELTA);
column2Schema.setEncodingList(encoding2);
- column2.setDimension(new CarbonDimension(column2Schema, 1, 1, 1, 1));
+ ProjectionDimension
+ column2 = new ProjectionDimension(new CarbonDimension(column2Schema, 1, 1, 1, 1));
- QueryDimension[] queryDimensions = { column1, column2 };
+ ProjectionDimension[] queryDimensions = { column1, column2 };
boolean[] dictionaryEncoding = CarbonUtil.getDirectDictionaryEncodingArray(queryDimensions);
boolean[] expectedDictionaryEncoding = { true, false };
@@ -577,19 +577,19 @@ public class CarbonUtilTest {
}
@Test public void testToGetComplexDataTypeArray() {
- QueryDimension column1 = new QueryDimension("Column1");
- QueryDimension column2 = new QueryDimension("Column2");
ColumnSchema column1Schema = new ColumnSchema();
ColumnSchema column2Schema = new ColumnSchema();
column1Schema.setColumnName("Column1");
column1Schema.setDataType(DataTypes.DATE);
- column1.setDimension(new CarbonDimension(column1Schema, 1, 1, 1, 1));
+ ProjectionDimension
+ column1 = new ProjectionDimension(new CarbonDimension(column1Schema, 1, 1, 1, 1));
column2Schema.setColumnName("Column2");
column2Schema.setDataType(DataTypes.createDefaultArrayType());
- column2.setDimension(new CarbonDimension(column2Schema, 1, 1, 1, 1));
+ ProjectionDimension
+ column2 = new ProjectionDimension(new CarbonDimension(column2Schema, 1, 1, 1, 1));
- QueryDimension[] queryDimensions = { column1, column2 };
+ ProjectionDimension[] queryDimensions = { column1, column2 };
boolean[] dictionaryEncoding = CarbonUtil.getComplexDataTypeArray(queryDimensions);
boolean[] expectedDictionaryEncoding = { false, true };
@@ -806,8 +806,8 @@ public class CarbonUtilTest {
@Test public void testToGetFirstIndexUsingBinarySearchWithCompareTo1() {
byte[] dataChunks = { 10, 20, 30, 40, 50, 60 };
byte[] compareValue = { 5 };
- FixedLengthDimensionDataChunk fixedLengthDimensionDataChunk =
- new FixedLengthDimensionDataChunk(dataChunks, null, null, 6, 1);
+ FixedLengthDimensionColumnPage fixedLengthDimensionDataChunk =
+ new FixedLengthDimensionColumnPage(dataChunks, null, null, 6, 1);
int result = CarbonUtil
.getFirstIndexUsingBinarySearch(fixedLengthDimensionDataChunk, 1, 3, compareValue, false);
assertEquals(-2, result);
@@ -816,8 +816,8 @@ public class CarbonUtilTest {
@Test public void testToGetFirstIndexUsingBinarySearchWithCompareToLessThan0() {
byte[] dataChunks = { 10, 20, 30, 40, 50, 60 };
byte[] compareValue = { 30 };
- FixedLengthDimensionDataChunk fixedLengthDimensionDataChunk =
- new FixedLengthDimensionDataChunk(dataChunks, null, null, 6, 1);
+ FixedLengthDimensionColumnPage fixedLengthDimensionDataChunk =
+ new FixedLengthDimensionColumnPage(dataChunks, null, null, 6, 1);
int result = CarbonUtil
.getFirstIndexUsingBinarySearch(fixedLengthDimensionDataChunk, 1, 3, compareValue, false);
assertEquals(2, result);
@@ -826,8 +826,8 @@ public class CarbonUtilTest {
@Test public void testToGetFirstIndexUsingBinarySearchWithCompareTo0() {
byte[] dataChunks = { 10, 10, 10, 40, 50, 60 };
byte[] compareValue = { 10 };
- FixedLengthDimensionDataChunk fixedLengthDimensionDataChunk =
- new FixedLengthDimensionDataChunk(dataChunks, null, null, 6, 1);
+ FixedLengthDimensionColumnPage fixedLengthDimensionDataChunk =
+ new FixedLengthDimensionColumnPage(dataChunks, null, null, 6, 1);
int result = CarbonUtil
.getFirstIndexUsingBinarySearch(fixedLengthDimensionDataChunk, 1, 3, compareValue, false);
assertEquals(0, result);
@@ -836,8 +836,8 @@ public class CarbonUtilTest {
@Test public void testToGetFirstIndexUsingBinarySearchWithMatchUpLimitTrue() {
byte[] dataChunks = { 10, 10, 10, 40, 50, 60 };
byte[] compareValue = { 10 };
- FixedLengthDimensionDataChunk fixedLengthDimensionDataChunk =
- new FixedLengthDimensionDataChunk(dataChunks, null, null, 6, 1);
+ FixedLengthDimensionColumnPage fixedLengthDimensionDataChunk =
+ new FixedLengthDimensionColumnPage(dataChunks, null, null, 6, 1);
int result = CarbonUtil
.getFirstIndexUsingBinarySearch(fixedLengthDimensionDataChunk, 1, 3, compareValue, true);
assertEquals(2, result);
@@ -847,13 +847,13 @@ public class CarbonUtilTest {
public void testBinaryRangeSearch() {
byte[] dataChunk = new byte[10];
- FixedLengthDimensionDataChunk fixedLengthDimensionDataChunk;
+ FixedLengthDimensionColumnPage fixedLengthDimensionDataChunk;
byte[] keyWord = new byte[1];
int[] range;
dataChunk = "abbcccddddeffgggh".getBytes();
byte[][] dataArr = new byte[dataChunk.length / keyWord.length][keyWord.length];
- fixedLengthDimensionDataChunk = new FixedLengthDimensionDataChunk(dataChunk, null, null,
+ fixedLengthDimensionDataChunk = new FixedLengthDimensionColumnPage(dataChunk, null, null,
dataChunk.length / keyWord.length, keyWord.length);
for (int ii = 0; ii < dataChunk.length / keyWord.length; ii++) {
@@ -885,7 +885,7 @@ public class CarbonUtilTest {
assertRangeIndex(dataArr, dataChunk, fixedLengthDimensionDataChunk, keyWord, expectRangeIndex);
dataChunk = "ab".getBytes();
- fixedLengthDimensionDataChunk = new FixedLengthDimensionDataChunk(dataChunk, null, null,
+ fixedLengthDimensionDataChunk = new FixedLengthDimensionColumnPage(dataChunk, null, null,
dataChunk.length / keyWord.length, keyWord.length);
keyWord[0] = Byte.valueOf("97");
@@ -899,7 +899,7 @@ public class CarbonUtilTest {
assertEquals(1, range[1]);
dataChunk = "aabb".getBytes();
- fixedLengthDimensionDataChunk = new FixedLengthDimensionDataChunk(dataChunk, null, null,
+ fixedLengthDimensionDataChunk = new FixedLengthDimensionColumnPage(dataChunk, null, null,
dataChunk.length / keyWord.length, keyWord.length);
keyWord[0] = Byte.valueOf("97");
@@ -913,7 +913,7 @@ public class CarbonUtilTest {
assertEquals(3, range[1]);
dataChunk = "a".getBytes();
- fixedLengthDimensionDataChunk = new FixedLengthDimensionDataChunk(dataChunk, null, null,
+ fixedLengthDimensionDataChunk = new FixedLengthDimensionColumnPage(dataChunk, null, null,
dataChunk.length / keyWord.length, keyWord.length);
keyWord[0] = Byte.valueOf("97");
@@ -922,7 +922,7 @@ public class CarbonUtilTest {
assertEquals(0, range[1]);
dataChunk = "aa".getBytes();
- fixedLengthDimensionDataChunk = new FixedLengthDimensionDataChunk(dataChunk, null, null,
+ fixedLengthDimensionDataChunk = new FixedLengthDimensionColumnPage(dataChunk, null, null,
dataChunk.length / keyWord.length, keyWord.length);
keyWord[0] = Byte.valueOf("97");
@@ -931,7 +931,7 @@ public class CarbonUtilTest {
assertEquals(1, range[1]);
dataChunk = "aabbbbbbbbbbcc".getBytes();
- fixedLengthDimensionDataChunk = new FixedLengthDimensionDataChunk(dataChunk, null, null,
+ fixedLengthDimensionDataChunk = new FixedLengthDimensionColumnPage(dataChunk, null, null,
dataChunk.length / keyWord.length, keyWord.length);
keyWord[0] = Byte.valueOf("98");
range = CarbonUtil.getRangeIndexUsingBinarySearch(fixedLengthDimensionDataChunk, 0, dataChunk.length - 1, keyWord);
@@ -944,14 +944,14 @@ public class CarbonUtilTest {
public void IndexUsingBinarySearchLengthTwo() {
byte[] dataChunk = new byte[10];
- FixedLengthDimensionDataChunk fixedLengthDimensionDataChunk;
+ FixedLengthDimensionColumnPage fixedLengthDimensionDataChunk;
byte[] keyWord = new byte[2];
dataChunk = "aabbbbbbbbbbcc".getBytes();
byte[][] dataArr = new byte[dataChunk.length / keyWord.length][keyWord.length];
- fixedLengthDimensionDataChunk = new FixedLengthDimensionDataChunk(dataChunk, null, null,
+ fixedLengthDimensionDataChunk = new FixedLengthDimensionColumnPage(dataChunk, null, null,
dataChunk.length / keyWord.length, keyWord.length);
for (int ii = 0; ii < dataChunk.length / keyWord.length; ii++) {
@@ -986,14 +986,14 @@ public class CarbonUtilTest {
public void IndexUsingBinarySearchLengthThree() {
byte[] dataChunk = new byte[10];
- FixedLengthDimensionDataChunk fixedLengthDimensionDataChunk;
+ FixedLengthDimensionColumnPage fixedLengthDimensionDataChunk;
byte[] keyWord = new byte[3];
dataChunk = "aaabbbbbbbbbccc".getBytes();
byte[][] dataArr = new byte[dataChunk.length / keyWord.length][keyWord.length];
- fixedLengthDimensionDataChunk = new FixedLengthDimensionDataChunk(dataChunk, null, null,
+ fixedLengthDimensionDataChunk = new FixedLengthDimensionColumnPage(dataChunk, null, null,
dataChunk.length / keyWord.length, keyWord.length);
for (int ii = 0; ii < dataChunk.length / keyWord.length; ii++) {
@@ -1101,7 +1101,7 @@ public class CarbonUtilTest {
}
private void assertRangeIndex(byte[][] dataArr, byte[] dataChunk,
- FixedLengthDimensionDataChunk fixedLengthDimensionDataChunk, byte[] keyWord, int[] expectRangeIndex) {
+ FixedLengthDimensionColumnPage fixedLengthDimensionDataChunk, byte[] keyWord, int[] expectRangeIndex) {
int[] range;
range = CarbonUtil.getRangeIndexUsingBinarySearch(fixedLengthDimensionDataChunk, 0,
(dataChunk.length - 1) / keyWord.length, keyWord);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/55c4e438/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java b/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java
index de64c0a..e506994 100644
--- a/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java
@@ -25,10 +25,10 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import org.apache.carbondata.core.datastore.FileHolder;
+import org.apache.carbondata.core.datastore.FileReader;
import org.apache.carbondata.core.datastore.block.TableBlockInfo;
import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.datastore.impl.FileHolderImpl;
+import org.apache.carbondata.core.datastore.impl.FileReaderImpl;
import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
import org.apache.carbondata.core.metadata.blocklet.SegmentInfo;
@@ -229,13 +229,13 @@ public class DataFileFooterConverterTest {
}
@SuppressWarnings("unused") @Mock
- public FileHolder getFileHolder(FileFactory.FileType fileType) {
- return new FileHolderImpl();
+ public FileReader getFileHolder(FileFactory.FileType fileType) {
+ return new FileReaderImpl();
}
};
- new MockUp<FileHolderImpl>() {
+ new MockUp<FileReaderImpl>() {
@SuppressWarnings("unused") @Mock public long readLong(String filePath, long offset) {
return 1;
}
@@ -249,7 +249,6 @@ public class DataFileFooterConverterTest {
SegmentInfo segmentInfo = new SegmentInfo();
int[] arr = { 1, 2, 3 };
segmentInfo.setColumnCardinality(arr);
- segmentInfo.setNumberOfColumns(segmentInfo1.getNum_cols());
dataFileFooter.setNumberOfRows(3);
dataFileFooter.setSegmentInfo(segmentInfo);
TableBlockInfo info = new TableBlockInfo("/file.carbondata", 1, "0", new String[0], 1, ColumnarFormatVersion.V1, null);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/55c4e438/core/src/test/java/org/apache/carbondata/core/util/RangeFilterProcessorTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/util/RangeFilterProcessorTest.java b/core/src/test/java/org/apache/carbondata/core/util/RangeFilterProcessorTest.java
index 4c9a784..4fb5dcc 100644
--- a/core/src/test/java/org/apache/carbondata/core/util/RangeFilterProcessorTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/util/RangeFilterProcessorTest.java
@@ -36,7 +36,6 @@ import org.apache.carbondata.core.scan.expression.logical.RangeExpression;
import org.apache.carbondata.core.scan.expression.logical.TrueExpression;
import org.apache.carbondata.core.scan.filter.executer.RangeValueFilterExecuterImpl;
import org.apache.carbondata.core.scan.filter.intf.FilterOptimizer;
-import org.apache.carbondata.core.scan.filter.intf.FilterOptimizerBasic;
import org.apache.carbondata.core.scan.filter.optimizer.RangeFilterOptmizer;
import mockit.Deencapsulation;
@@ -102,7 +101,7 @@ public class RangeFilterProcessorTest {
new LessThanEqualToExpression(new ColumnExpression("a", DataTypes.STRING),
new LiteralExpression("20", DataTypes.STRING))), new TrueExpression(null));
FilterOptimizer rangeFilterOptimizer =
- new RangeFilterOptmizer(new FilterOptimizerBasic(), inputFilter);
+ new RangeFilterOptmizer(inputFilter);
rangeFilterOptimizer.optimizeFilter();
result = checkBothTrees(inputFilter, output);
Assert.assertTrue(result);
@@ -143,7 +142,7 @@ public class RangeFilterProcessorTest {
new LessThanEqualToExpression(new ColumnExpression("a", DataTypes.STRING),
new LiteralExpression("05", DataTypes.STRING)));
FilterOptimizer rangeFilterOptimizer =
- new RangeFilterOptmizer(new FilterOptimizerBasic(), inputFilter);
+ new RangeFilterOptmizer(inputFilter);
rangeFilterOptimizer.optimizeFilter();
result = checkBothTrees(inputFilter, output);
// no change
@@ -218,7 +217,7 @@ public class RangeFilterProcessorTest {
Expression Andb3 = new AndExpression(Andb2, new TrueExpression(null));
FilterOptimizer rangeFilterOptimizer =
- new RangeFilterOptmizer(new FilterOptimizerBasic(), inputFilter);
+ new RangeFilterOptmizer(inputFilter);
rangeFilterOptimizer.optimizeFilter();
result = checkBothTrees(inputFilter, new AndExpression(Andb3, new TrueExpression(null)));
// no change
@@ -302,7 +301,7 @@ public class RangeFilterProcessorTest {
Expression Orb3 = new OrExpression(Orb2, lessThanb2);
FilterOptimizer rangeFilterOptimizer =
- new RangeFilterOptmizer(new FilterOptimizerBasic(), inputFilter);
+ new RangeFilterOptmizer(inputFilter);
rangeFilterOptimizer.optimizeFilter();
result = checkBothTrees(inputFilter, new OrExpression(Orb3, lessThanb1));
// no change
http://git-wip-us.apache.org/repos/asf/carbondata/blob/55c4e438/core/src/test/java/org/apache/carbondata/scanner/impl/FilterScannerTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/scanner/impl/FilterScannerTest.java b/core/src/test/java/org/apache/carbondata/scanner/impl/FilterScannerTest.java
deleted file mode 100644
index 94c3f68..0000000
--- a/core/src/test/java/org/apache/carbondata/scanner/impl/FilterScannerTest.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.scanner.impl;
-
-import static junit.framework.TestCase.assertEquals;
-
-public class FilterScannerTest {
-//
-// private static FilterScanner filterScanner;
-// private static BlockletIndex blockletIndex;
-// private static BlockletMinMaxIndex blockletMinMaxIndex;
-// private static BTreeBuilderInfo bTreeBuilderInfo;
-// private static DataFileFooter dataFileFooter;
-//
-// @BeforeClass public static void setUp() {
-// BlockExecutionInfo blockExecutionInfo = new BlockExecutionInfo();
-// FilterExecuter filterExecutor = new AndFilterExecuterImpl(null, null);
-// blockExecutionInfo.setFilterExecuterTree(filterExecutor);
-// blockExecutionInfo.setFixedLengthKeySize(1);
-// blockExecutionInfo.setNoDictionaryBlockIndexes(new int[] { 1, 2 });
-// blockExecutionInfo.setDictionaryColumnBlockIndex(new int[] { 1 });
-// blockExecutionInfo.setColumnGroupToKeyStructureInfo(new HashMap<Integer, KeyStructureInfo>());
-// blockExecutionInfo.setComplexDimensionInfoMap(new HashMap<Integer, GenericQueryType>());
-// blockExecutionInfo.setComplexColumnParentBlockIndexes(new int[] { 1 });
-// blockExecutionInfo.setQueryDimensions(new QueryDimension[] { new QueryDimension("Col1") });
-// blockExecutionInfo.setAllSelectedDimensionBlocksIndexes(new int[][] { { 0, 0 } });
-// blockExecutionInfo.setAllSelectedMeasureBlocksIndexes(new int[][] { { 0, 0 } });
-// blockExecutionInfo.setTotalNumberOfMeasureBlock(1);
-// blockExecutionInfo.setTotalNumberDimensionBlock(1);
-// QueryStatisticsModel queryStatisticsModel = new QueryStatisticsModel();
-// QueryStatistic queryStatistic = new QueryStatistic();
-// queryStatistic.addCountStatistic(QueryStatisticsConstants.TOTAL_BLOCKLET_NUM, 1);
-// Map<String, QueryStatistic> statisticsTypeAndObjMap = new HashMap<>();
-// statisticsTypeAndObjMap.put(QueryStatisticsConstants.TOTAL_BLOCKLET_NUM, queryStatistic);
-// statisticsTypeAndObjMap.put(QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM, queryStatistic);
-// queryStatisticsModel.setStatisticsTypeAndObjMap(statisticsTypeAndObjMap);
-// QueryStatisticsRecorder queryStatisticsRecorder = new QueryStatisticsRecorderImpl("1");
-// queryStatisticsModel.setRecorder(queryStatisticsRecorder);
-// filterScanner = new FilterScanner(blockExecutionInfo, queryStatisticsModel);
-// blockletIndex = new BlockletIndex();
-// blockletMinMaxIndex = new BlockletMinMaxIndex();
-// blockletMinMaxIndex.setMinValues(new byte[][] { { 1, 2 } });
-// blockletMinMaxIndex.setMaxValues(new byte[][] { { 10, 12 } });
-// blockletIndex.setMinMaxIndex(blockletMinMaxIndex);
-// dataFileFooter = new DataFileFooter();
-// dataFileFooter.setBlockletIndex(blockletIndex);
-// bTreeBuilderInfo = new BTreeBuilderInfo(Arrays.asList(dataFileFooter), new int[] { 1 });
-// }
-//
-// @Test public void testToScanBlockletWithEmptyBitSet() throws QueryExecutionException {
-// new MockUp<AndFilterExecuterImpl>() {
-// @SuppressWarnings("unused") @Mock
-// public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] blockMinValue) {
-// return new BitSet();
-// }
-// };
-// BlocksChunkHolder blocksChunkHolder = new BlocksChunkHolder(1, 1);
-// DataRefNode dataRefNode = new BlockBTreeLeafNode(bTreeBuilderInfo, 0, 1);
-// blocksChunkHolder.setDataBlock(dataRefNode);
-// AbstractScannedResult abstractScannedResult = filterScanner.scanBlocklet(blocksChunkHolder);
-// assertEquals(0, abstractScannedResult.numberOfOutputRows());
-// }
-//
-// @Test public void testToScanBlockletWithNonEmptyBitSet() throws QueryExecutionException {
-// new MockUp<AndFilterExecuterImpl>() {
-// @SuppressWarnings("unused") @Mock
-// public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] blockMinValue) {
-// BitSet bitSet = new BitSet();
-// bitSet.set(1);
-// bitSet.set(2);
-// bitSet.set(1);
-// return bitSet;
-// }
-//
-// @SuppressWarnings("unused") @Mock
-// public BitSet applyFilter(BlocksChunkHolder blockChunkHolder)
-// throws FilterUnsupportedException {
-// BitSet bitSet = new BitSet();
-// bitSet.set(1);
-// bitSet.set(2);
-// bitSet.set(1);
-// return bitSet;
-// }
-// };
-// DataRefNode dataRefNode = new MockUp<DataRefNode>() {
-// @Mock @SuppressWarnings("unused") DimensionColumnDataChunk[] getDimensionChunks(
-// FileHolder fileReader, int[][] blockIndexes) {
-// DimensionColumnDataChunk[] dimensionChunkAttributes =
-// { new ColumnGroupDimensionDataChunk(null, null) };
-// return dimensionChunkAttributes;
-// }
-//
-// @Mock @SuppressWarnings("unused") ColumnPage[] getMeasureChunks(
-// FileHolder fileReader, int[][] blockIndexes) {
-//
-// ColumnPage[] ColumnPages = { new ColumnPage() };
-// return ColumnPages;
-// }
-// }.getMockInstance();
-//
-// BlocksChunkHolder blocksChunkHolder = new BlocksChunkHolder(1, 1);
-// blocksChunkHolder.setDataBlock(dataRefNode);
-// DimensionChunkAttributes dimensionChunkAttributes = new DimensionChunkAttributes();
-// DimensionColumnDataChunk dimensionColumnDataChunk =
-// new FixedLengthDimensionDataChunk(new byte[] { 0, 1 }, dimensionChunkAttributes);
-// blocksChunkHolder.setDimensionRawDataChunk(new DimensionColumnDataChunk[]
-//
-// { dimensionColumnDataChunk });
-// ColumnPage ColumnPage = new ColumnPage();
-// blocksChunkHolder.setMeasureDataChunk(new ColumnPage[]
-//
-// { ColumnPage });
-// FileHolder fileHolder = new DFSFileHolderImpl();
-// blocksChunkHolder.setFileReader(fileHolder);
-// AbstractScannedResult abstractScannedResult = filterScanner.scanBlocklet(blocksChunkHolder);
-//
-// assertEquals(2, abstractScannedResult.numberOfOutputRows());
-// }
-//
-// @Test(expected = QueryExecutionException.class) public void testToScanBlockletWithException()
-// throws QueryExecutionException {
-// new MockUp<AndFilterExecuterImpl>() {
-// @SuppressWarnings("unused") @Mock
-// public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] blockMinValue) {
-// BitSet bitSet = new BitSet();
-// bitSet.set(1);
-// bitSet.set(2);
-// bitSet.set(1);
-// return bitSet;
-// }
-//
-// @SuppressWarnings("unused") @Mock
-// public BitSet applyFilter(BlocksChunkHolder blockChunkHolder)
-// throws FilterUnsupportedException {
-// throw new FilterUnsupportedException("Filter unsupported");
-// }
-// };
-// BlocksChunkHolder blocksChunkHolder = new BlocksChunkHolder(1, 1);
-// BTreeBuilderInfo bTreeBuilderInfo =
-// new BTreeBuilderInfo(Arrays.asList(dataFileFooter), new int[] { 1 });
-// DataRefNode dataRefNode = new BlockBTreeLeafNode(bTreeBuilderInfo, 0, 1);
-// blocksChunkHolder.setDataBlock(dataRefNode);
-// filterScanner.scanBlocklet(blocksChunkHolder);
-// }
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/55c4e438/dev/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/dev/findbugs-exclude.xml b/dev/findbugs-exclude.xml
index 1520cd4..b19db85 100644
--- a/dev/findbugs-exclude.xml
+++ b/dev/findbugs-exclude.xml
@@ -31,7 +31,7 @@
<Bug pattern="OBL_UNSATISFIED_OBLIGATION_EXCEPTION_EDGE"/>
</Match>
<Match>
- <Class name="org.apache.carbondata.core.datastore.impl.FileHolderImpl"/>
+ <Class name="org.apache.carbondata.core.datastore.impl.FileReaderImpl"/>
<Method name="getDataInputStream"/>
<Bug pattern="OBL_UNSATISFIED_OBLIGATION_EXCEPTION_EDGE"/>
</Match>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/55c4e438/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
index 76afcbf..7a15327 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
@@ -62,7 +62,7 @@ object CarbonSessionExample {
spark.sql(
s"""
- | SELECT *
+ | SELECT charField, stringField, intField
| FROM carbon_table
| WHERE stringfield = 'spark' AND decimalField > 40
""".stripMargin).show()
http://git-wip-us.apache.org/repos/asf/carbondata/blob/55c4e438/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index fdec267..5a20d7e 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -58,7 +58,6 @@ import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor;
import org.apache.carbondata.core.scan.filter.SingleTableProvider;
import org.apache.carbondata.core.scan.filter.TableProvider;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
-import org.apache.carbondata.core.scan.model.CarbonQueryPlan;
import org.apache.carbondata.core.scan.model.QueryModel;
import org.apache.carbondata.core.stats.QueryStatistic;
import org.apache.carbondata.core.stats.QueryStatisticsConstants;
@@ -112,11 +111,11 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
// comma separated list of input segment numbers
public static final String INPUT_SEGMENT_NUMBERS =
"mapreduce.input.carboninputformat.segmentnumbers";
- public static final String VALIDATE_INPUT_SEGMENT_IDs =
+ private static final String VALIDATE_INPUT_SEGMENT_IDs =
"mapreduce.input.carboninputformat.validsegments";
// comma separated list of input files
public static final String INPUT_FILES = "mapreduce.input.carboninputformat.files";
- public static final String ALTER_PARTITION_ID = "mapreduce.input.carboninputformat.partitionid";
+ private static final String ALTER_PARTITION_ID = "mapreduce.input.carboninputformat.partitionid";
private static final Log LOG = LogFactory.getLog(CarbonTableInputFormat.class);
private static final String FILTER_PREDICATE =
"mapreduce.input.carboninputformat.filter.predicate";
@@ -127,7 +126,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
private static final String DATA_MAP_DSTR = "mapreduce.input.carboninputformat.datamapdstr";
public static final String DATABASE_NAME = "mapreduce.input.carboninputformat.databaseName";
public static final String TABLE_NAME = "mapreduce.input.carboninputformat.tableName";
- public static final String PARTITIONS_TO_PRUNE =
+ private static final String PARTITIONS_TO_PRUNE =
"mapreduce.input.carboninputformat.partitions.to.prune";
public static final String UPADTE_T =
"mapreduce.input.carboninputformat.partitions.to.prune";
@@ -340,7 +339,8 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
* @return List<InputSplit> list of CarbonInputSplit
* @throws IOException
*/
- @Override public List<InputSplit> getSplits(JobContext job) throws IOException {
+ @Override
+ public List<InputSplit> getSplits(JobContext job) throws IOException {
AbsoluteTableIdentifier identifier = getAbsoluteTableIdentifier(job.getConfiguration());
LoadMetadataDetails[] loadMetadataDetails = SegmentStatusManager
.readTableStatusFile(CarbonTablePath.getTableStatusFilePath(identifier.getTablePath()));
@@ -811,28 +811,29 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
return split;
}
- @Override public RecordReader<Void, T> createRecordReader(InputSplit inputSplit,
+ @Override
+ public RecordReader<Void, T> createRecordReader(InputSplit inputSplit,
TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
Configuration configuration = taskAttemptContext.getConfiguration();
- QueryModel queryModel = getQueryModel(inputSplit, taskAttemptContext);
+ QueryModel queryModel = createQueryModel(inputSplit, taskAttemptContext);
CarbonReadSupport<T> readSupport = getReadSupportClass(configuration);
return new CarbonRecordReader<T>(queryModel, readSupport);
}
- public QueryModel getQueryModel(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
+ public QueryModel createQueryModel(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
throws IOException {
Configuration configuration = taskAttemptContext.getConfiguration();
CarbonTable carbonTable = getOrCreateCarbonTable(configuration);
TableProvider tableProvider = new SingleTableProvider(carbonTable);
- // getting the table absoluteTableIdentifier from the carbonTable
- // to avoid unnecessary deserialization
- AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier();
// query plan includes projection column
- String projection = getColumnProjection(configuration);
- CarbonQueryPlan queryPlan = CarbonInputFormatUtil.createQueryPlan(carbonTable, projection);
- QueryModel queryModel = QueryModel.createModel(identifier, queryPlan, carbonTable,
- getDataTypeConverter(configuration));
+ String projectionString = getColumnProjection(configuration);
+ String[] projectionColumnNames = null;
+ if (projectionString != null) {
+ projectionColumnNames = projectionString.split(",");
+ }
+ QueryModel queryModel = carbonTable.createQueryWithProjection(
+ projectionColumnNames, getDataTypeConverter(configuration));
// set the filter to the query model in order to filter blocklet before scan
Expression filter = getFilterPredicates(configuration);
@@ -887,7 +888,8 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
return readSupport;
}
- @Override protected boolean isSplitable(JobContext context, Path filename) {
+ @Override
+ protected boolean isSplitable(JobContext context, Path filename) {
try {
// Don't split the file if it is local file system
FileSystem fileSystem = filename.getFileSystem(context.getConfiguration());
@@ -901,16 +903,6 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
}
/**
- * required to be moved to core
- *
- * @return updateExtension
- */
- private String getUpdateExtension() {
- // TODO: required to modify when supporting update, mostly will be update timestamp
- return "update";
- }
-
- /**
* return valid segment to access
*/
public Segment[] getSegmentsToAccess(JobContext job) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/55c4e438/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java
index a590a5b..0fe0cbf 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java
@@ -176,9 +176,7 @@ class InMemoryBTreeIndex implements Index {
filterredBlocks = filterExpressionProcessor.getFilterredBlocks(
abstractIndex.getDataRefNode(),
resolver,
- abstractIndex,
- identifier
- );
+ abstractIndex);
}
resultFilterredBlocks.addAll(filterredBlocks);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/55c4e438/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java
index 95a7af0..1e227c4 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java
@@ -156,7 +156,7 @@ public class CarbonStreamRecordReader extends RecordReader<Void, Object> {
hadoopConf = context.getConfiguration();
if (model == null) {
CarbonTableInputFormat format = new CarbonTableInputFormat<Object>();
- model = format.getQueryModel(split, context);
+ model = format.createQueryModel(split, context);
}
carbonTable = model.getTable();
List<CarbonDimension> dimensions =
http://git-wip-us.apache.org/repos/asf/carbondata/blob/55c4e438/hadoop/src/main/java/org/apache/carbondata/hadoop/util/BlockLevelTraverser.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/BlockLevelTraverser.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/BlockLevelTraverser.java
index 89a4a9a..2f28861 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/BlockLevelTraverser.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/BlockLevelTraverser.java
@@ -67,7 +67,7 @@ public class BlockLevelTraverser {
blockName = CarbonTablePath.getCarbonDataFileName(blockName);
blockName = blockName + CarbonTablePath.getCarbonDataExtension();
- long rowCount = currentBlock.nodeSize();
+ long rowCount = currentBlock.numRows();
String key = CarbonUpdateUtil.getSegmentBlockNameKey(segId, blockName);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/55c4e438/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
index 056c27b..9f8c5ec 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
@@ -26,18 +26,12 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
import org.apache.carbondata.core.scan.expression.Expression;
import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor;
import org.apache.carbondata.core.scan.filter.TableProvider;
import org.apache.carbondata.core.scan.filter.intf.FilterOptimizer;
-import org.apache.carbondata.core.scan.filter.intf.FilterOptimizerBasic;
import org.apache.carbondata.core.scan.filter.optimizer.RangeFilterOptmizer;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
-import org.apache.carbondata.core.scan.model.CarbonQueryPlan;
-import org.apache.carbondata.core.scan.model.QueryDimension;
-import org.apache.carbondata.core.scan.model.QueryMeasure;
import org.apache.carbondata.core.scan.model.QueryModel;
import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
@@ -52,45 +46,14 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
*/
public class CarbonInputFormatUtil {
- public static CarbonQueryPlan createQueryPlan(CarbonTable carbonTable, String columnString) {
- String[] columns = null;
- if (columnString != null) {
- columns = columnString.split(",");
- }
- String factTableName = carbonTable.getTableName();
- CarbonQueryPlan plan = new CarbonQueryPlan(carbonTable.getDatabaseName(), factTableName);
- // fill dimensions
- // If columns are null, set all dimensions and measures
- int i = 0;
- if (columns != null) {
- for (String column : columns) {
- CarbonDimension dimensionByName = carbonTable.getDimensionByName(factTableName, column);
- if (dimensionByName != null) {
- addQueryDimension(plan, i, dimensionByName);
- i++;
- } else {
- CarbonMeasure measure = carbonTable.getMeasureByName(factTableName, column);
- if (measure == null) {
- throw new RuntimeException(column + " column not found in the table " + factTableName);
- }
- addQueryMeasure(plan, i, measure);
- i++;
- }
- }
- }
-
- plan.setQueryId(System.nanoTime() + "");
- return plan;
- }
-
public static <V> CarbonTableInputFormat<V> createCarbonInputFormat(
AbsoluteTableIdentifier identifier,
Job job) throws IOException {
CarbonTableInputFormat<V> carbonInputFormat = new CarbonTableInputFormat<>();
- carbonInputFormat.setDatabaseName(job.getConfiguration(),
- identifier.getCarbonTableIdentifier().getDatabaseName());
- carbonInputFormat
- .setTableName(job.getConfiguration(), identifier.getCarbonTableIdentifier().getTableName());
+ CarbonTableInputFormat.setDatabaseName(
+ job.getConfiguration(), identifier.getCarbonTableIdentifier().getDatabaseName());
+ CarbonTableInputFormat.setTableName(
+ job.getConfiguration(), identifier.getCarbonTableIdentifier().getTableName());
FileInputFormat.addInputPath(job, new Path(identifier.getTablePath()));
return carbonInputFormat;
}
@@ -98,30 +61,16 @@ public class CarbonInputFormatUtil {
public static <V> CarbonTableInputFormat<V> createCarbonTableInputFormat(
AbsoluteTableIdentifier identifier, List<String> partitionId, Job job) throws IOException {
CarbonTableInputFormat<V> carbonTableInputFormat = new CarbonTableInputFormat<>();
- carbonTableInputFormat.setPartitionIdList(job.getConfiguration(), partitionId);
- carbonTableInputFormat.setDatabaseName(job.getConfiguration(),
- identifier.getCarbonTableIdentifier().getDatabaseName());
- carbonTableInputFormat
- .setTableName(job.getConfiguration(), identifier.getCarbonTableIdentifier().getTableName());
+ CarbonTableInputFormat.setPartitionIdList(
+ job.getConfiguration(), partitionId);
+ CarbonTableInputFormat.setDatabaseName(
+ job.getConfiguration(), identifier.getCarbonTableIdentifier().getDatabaseName());
+ CarbonTableInputFormat.setTableName(
+ job.getConfiguration(), identifier.getCarbonTableIdentifier().getTableName());
FileInputFormat.addInputPath(job, new Path(identifier.getTablePath()));
return carbonTableInputFormat;
}
- private static void addQueryMeasure(CarbonQueryPlan plan, int order, CarbonMeasure measure) {
- QueryMeasure queryMeasure = new QueryMeasure(measure.getColName());
- queryMeasure.setQueryOrder(order);
- queryMeasure.setMeasure(measure);
- plan.addMeasure(queryMeasure);
- }
-
- private static void addQueryDimension(CarbonQueryPlan plan, int order,
- CarbonDimension dimension) {
- QueryDimension queryDimension = new QueryDimension(dimension.getColName());
- queryDimension.setQueryOrder(order);
- queryDimension.setDimension(dimension);
- plan.addDimension(queryDimension);
- }
-
public static void processFilterExpression(Expression filterExpression, CarbonTable carbonTable,
boolean[] isFilterDimensions, boolean[] isFilterMeasures) {
QueryModel.processFilterExpression(carbonTable, filterExpression, isFilterDimensions,
@@ -130,7 +79,7 @@ public class CarbonInputFormatUtil {
if (null != filterExpression) {
// Optimize Filter Expression and fit RANGE filters is conditions apply.
FilterOptimizer rangeFilterOptimizer =
- new RangeFilterOptmizer(new FilterOptimizerBasic(), filterExpression);
+ new RangeFilterOptmizer(filterExpression);
rangeFilterOptimizer.optimizeFilter();
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/55c4e438/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
index f109e1c..1b57f93 100644
--- a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
@@ -30,7 +30,6 @@ import org.apache.carbondata.core.scan.expression.Expression;
import org.apache.carbondata.core.scan.filter.SingleTableProvider;
import org.apache.carbondata.core.scan.filter.TableProvider;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
-import org.apache.carbondata.core.scan.model.CarbonQueryPlan;
import org.apache.carbondata.core.scan.model.QueryModel;
import org.apache.carbondata.core.util.DataTypeConverterImpl;
import org.apache.carbondata.hadoop.CarbonInputSplit;
@@ -140,11 +139,11 @@ public class MapredCarbonInputFormat extends CarbonTableInputFormat<ArrayWritabl
AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier();
- String projection = getProjection(configuration, carbonTable,
+ String projectionString = getProjection(configuration, carbonTable,
identifier.getCarbonTableIdentifier().getTableName());
- CarbonQueryPlan queryPlan = CarbonInputFormatUtil.createQueryPlan(carbonTable, projection);
- QueryModel queryModel =
- QueryModel.createModel(identifier, queryPlan, carbonTable, new DataTypeConverterImpl());
+ String[] projectionColumns = projectionString.split(",");
+ QueryModel queryModel = carbonTable.createQueryWithProjection(
+ projectionColumns, new DataTypeConverterImpl());
// set the filter to the query model in order to filter blocklet before scan
Expression filter = getFilterPredicates(configuration);
CarbonInputFormatUtil.processFilterExpression(filter, carbonTable, null, null);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/55c4e438/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorizedRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorizedRecordReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorizedRecordReader.java
deleted file mode 100644
index 9a8f8c5..0000000
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorizedRecordReader.java
+++ /dev/null
@@ -1,243 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.presto;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.carbondata.core.cache.dictionary.Dictionary;
-import org.apache.carbondata.core.datastore.block.TableBlockInfo;
-import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
-import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import org.apache.carbondata.core.metadata.datatype.StructField;
-import org.apache.carbondata.core.metadata.encoder.Encoding;
-import org.apache.carbondata.core.scan.executor.QueryExecutor;
-import org.apache.carbondata.core.scan.executor.QueryExecutorFactory;
-import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
-import org.apache.carbondata.core.scan.model.QueryDimension;
-import org.apache.carbondata.core.scan.model.QueryMeasure;
-import org.apache.carbondata.core.scan.model.QueryModel;
-import org.apache.carbondata.core.scan.result.iterator.AbstractDetailQueryResultIterator;
-import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
-import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.hadoop.AbstractRecordReader;
-import org.apache.carbondata.hadoop.CarbonInputSplit;
-import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
-
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-/**
- * A specialized RecordReader that reads into InternalRows or ColumnarBatches directly using the
- * carbondata column APIs and fills the data directly into columns.
- */
-class CarbonVectorizedRecordReader extends AbstractRecordReader<Object> {
-
- private int batchIdx = 0;
-
- private int numBatched = 0;
-
- private CarbonVectorBatch columnarBatch;
-
- private CarbonColumnarBatch carbonColumnarBatch;
-
- /**
- * If true, this class returns batches instead of rows.
- */
- private boolean returnColumnarBatch;
-
- private QueryModel queryModel;
-
- private AbstractDetailQueryResultIterator iterator;
-
- private QueryExecutor queryExecutor;
-
- public CarbonVectorizedRecordReader(QueryExecutor queryExecutor, QueryModel queryModel, AbstractDetailQueryResultIterator iterator) {
- this.queryModel = queryModel;
- this.iterator = iterator;
- this.queryExecutor = queryExecutor;
- enableReturningBatches();
- }
-
- /**
- * Implementation of RecordReader API.
- */
- @Override public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
- throws IOException, InterruptedException, UnsupportedOperationException {
- // The input split can contain single HDFS block or multiple blocks, so firstly get all the
- // blocks and then set them in the query model.
- List<CarbonInputSplit> splitList;
- if (inputSplit instanceof CarbonInputSplit) {
- splitList = new ArrayList<>(1);
- splitList.add((CarbonInputSplit) inputSplit);
- } else if (inputSplit instanceof CarbonMultiBlockSplit) {
- // contains multiple blocks, this is an optimization for concurrent query.
- CarbonMultiBlockSplit multiBlockSplit = (CarbonMultiBlockSplit) inputSplit;
- splitList = multiBlockSplit.getAllSplits();
- } else {
- throw new RuntimeException("unsupported input split type: " + inputSplit);
- }
- List<TableBlockInfo> tableBlockInfoList = CarbonInputSplit.createBlocks(splitList);
- queryModel.setTableBlockInfos(tableBlockInfoList);
- queryModel.setVectorReader(true);
- try {
- queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel);
- iterator = (AbstractDetailQueryResultIterator) queryExecutor.execute(queryModel);
- } catch (QueryExecutionException e) {
- throw new InterruptedException(e.getMessage());
- }
- }
-
- @Override public void close() throws IOException {
- logStatistics(rowCount, queryModel.getStatisticsRecorder());
- if (columnarBatch != null) {
- columnarBatch = null;
- }
- // clear dictionary cache
- Map<String, Dictionary> columnToDictionaryMapping = queryModel.getColumnToDictionaryMapping();
- if (null != columnToDictionaryMapping) {
- for (Map.Entry<String, Dictionary> entry : columnToDictionaryMapping.entrySet()) {
- CarbonUtil.clearDictionaryCache(entry.getValue());
- }
- }
- try {
- queryExecutor.finish();
- } catch (QueryExecutionException e) {
- throw new IOException(e);
- }
- }
-
- @Override public boolean nextKeyValue() throws IOException, InterruptedException {
- resultBatch();
-
- if (returnColumnarBatch) return nextBatch();
-
- if (batchIdx >= numBatched) {
- if (!nextBatch()) return false;
- }
- ++batchIdx;
- return true;
- }
-
- @Override public Object getCurrentValue() throws IOException, InterruptedException {
- if (returnColumnarBatch) {
- rowCount += columnarBatch.numValidRows();
- return columnarBatch;
- } else {
- return null;
- }
- }
-
- @Override public Void getCurrentKey() throws IOException, InterruptedException {
- return null;
- }
-
- @Override public float getProgress() throws IOException, InterruptedException {
- // TODO : Implement it based on total number of rows it is going to retrive.
- return 0;
- }
-
- /**
- * Returns the ColumnarBatch object that will be used for all rows returned by this reader.
- * This object is reused. Calling this enables the vectorized reader. This should be called
- * before any calls to nextKeyValue/nextBatch.
- */
-
- private void initBatch() {
- List<QueryDimension> queryDimension = queryModel.getQueryDimension();
- List<QueryMeasure> queryMeasures = queryModel.getQueryMeasures();
- StructField[] fields = new StructField[queryDimension.size() + queryMeasures.size()];
- for (int i = 0; i < queryDimension.size(); i++) {
- QueryDimension dim = queryDimension.get(i);
- if (dim.getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY)) {
- DirectDictionaryGenerator generator = DirectDictionaryKeyGeneratorFactory
- .getDirectDictionaryGenerator(dim.getDimension().getDataType());
- fields[dim.getQueryOrder()] = new StructField(dim.getColumnName(),
- generator.getReturnType());
- } else if (!dim.getDimension().hasEncoding(Encoding.DICTIONARY)) {
- fields[dim.getQueryOrder()] = new StructField(dim.getColumnName(),
- dim.getDimension().getDataType());
- } else if (dim.getDimension().isComplex()) {
- fields[dim.getQueryOrder()] = new StructField(dim.getColumnName(),
- dim.getDimension().getDataType());
- } else {
- fields[dim.getQueryOrder()] = new StructField(dim.getColumnName(),
- DataTypes.INT);
- }
- }
-
- for (QueryMeasure msr : queryMeasures) {
- DataType dataType = msr.getMeasure().getDataType();
- if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.SHORT || dataType == DataTypes.INT
- || dataType == DataTypes.LONG) {
- fields[msr.getQueryOrder()] =
- new StructField(msr.getColumnName(), msr.getMeasure().getDataType());
- } else if (DataTypes.isDecimal(dataType)) {
- fields[msr.getQueryOrder()] =
- new StructField(msr.getColumnName(), msr.getMeasure().getDataType());
- } else {
- fields[msr.getQueryOrder()] = new StructField(msr.getColumnName(), DataTypes.DOUBLE);
- }
- }
-
- columnarBatch = CarbonVectorBatch.allocate(fields);
- CarbonColumnVector[] vectors = new CarbonColumnVector[fields.length];
- boolean[] filteredRows = new boolean[columnarBatch.capacity()];
- for (int i = 0; i < fields.length; i++) {
- vectors[i] = new CarbonColumnVectorWrapper(columnarBatch.column(i), filteredRows);
- }
- carbonColumnarBatch = new CarbonColumnarBatch(vectors, columnarBatch.capacity(), filteredRows);
- }
-
-
- private CarbonVectorBatch resultBatch() {
- if (columnarBatch == null) initBatch();
- return columnarBatch;
- }
-
- /*
- * Can be called before any rows are returned to enable returning columnar batches directly.
- */
- private void enableReturningBatches() {
- returnColumnarBatch = true;
- }
-
- /**
- * Advances to the next batch of rows. Returns false if there are no more.
- */
- private boolean nextBatch() {
- columnarBatch.reset();
- carbonColumnarBatch.reset();
- if (iterator.hasNext()) {
- iterator.processNextBatch(carbonColumnarBatch);
- int actualSize = carbonColumnarBatch.getActualSize();
- columnarBatch.setNumRows(actualSize);
- numBatched = actualSize;
- batchIdx = 0;
- return true;
- }
- return false;
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/55c4e438/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
index 1679f29..5f1f90a 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
@@ -54,7 +54,7 @@ class CarbondataPageSource implements ConnectorPageSource {
private final List<Type> types;
private final PageBuilder pageBuilder;
private boolean closed;
- private CarbonVectorizedRecordReader vectorReader;
+ private PrestoCarbonVectorizedRecordReader vectorReader;
private CarbonDictionaryDecodeReadSupport<Object[]> readSupport;
private long sizeOfData = 0;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/55c4e438/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java
index c614fa9..5772fbf 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java
@@ -54,7 +54,7 @@ public class CarbondataRecordCursor implements RecordCursor {
private CarbondataSplit split;
private CarbonDictionaryDecodeReadSupport readSupport;
private Tuple3<DataType, Dictionary, Int>[] dictionary;
- CarbonVectorizedRecordReader vectorizedRecordReader;
+ PrestoCarbonVectorizedRecordReader vectorizedRecordReader;
private long totalBytes;
private long nanoStart;
@@ -63,7 +63,7 @@ public class CarbondataRecordCursor implements RecordCursor {
public CarbondataRecordCursor(CarbonDictionaryDecodeReadSupport readSupport,
- CarbonVectorizedRecordReader vectorizedRecordReader,
+ PrestoCarbonVectorizedRecordReader vectorizedRecordReader,
List<CarbondataColumnHandle> columnHandles,
CarbondataSplit split) {
this.vectorizedRecordReader = vectorizedRecordReader;
@@ -194,7 +194,7 @@ public class CarbondataRecordCursor implements RecordCursor {
//todo delete cache from readSupport
}
- public CarbonVectorizedRecordReader getVectorizedRecordReader() {
+ public PrestoCarbonVectorizedRecordReader getVectorizedRecordReader() {
return vectorizedRecordReader;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/55c4e438/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java
index 0f8fe87..286ff0e 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java
@@ -78,8 +78,8 @@ public class CarbondataRecordSet implements RecordSet {
readSupport
.initialize(queryModel.getProjectionColumns(), queryModel.getTable());
CarbonIterator iterator = queryExecutor.execute(queryModel);
- CarbonVectorizedRecordReader vectorReader =
- new CarbonVectorizedRecordReader(queryExecutor, queryModel,
+ PrestoCarbonVectorizedRecordReader vectorReader =
+ new PrestoCarbonVectorizedRecordReader(queryExecutor, queryModel,
(AbstractDetailQueryResultIterator) iterator);
return new CarbondataRecordCursor(readSupport, vectorReader, columns, split);
} catch (QueryExecutionException e) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/55c4e438/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
index f039daf..5a2f831 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
@@ -105,7 +105,7 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider {
new TaskAttemptContextImpl(jobConf, new TaskAttemptID("", 1, TaskType.MAP, 0, 0));
CarbonInputSplit carbonInputSplit =
CarbonLocalInputSplit.convertSplit(carbondataSplit.getLocalInputSplit());
- queryModel = carbonTableInputFormat.getQueryModel(carbonInputSplit, hadoopAttemptContext);
+ queryModel = carbonTableInputFormat.createQueryModel(carbonInputSplit, hadoopAttemptContext);
queryModel.setVectorReader(true);
} catch (IOException e) {
throw new RuntimeException("Unable to get the Query Model ", e);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/55c4e438/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoCarbonVectorizedRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoCarbonVectorizedRecordReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoCarbonVectorizedRecordReader.java
new file mode 100644
index 0000000..a1907db
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoCarbonVectorizedRecordReader.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.datatype.StructField;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.scan.executor.QueryExecutor;
+import org.apache.carbondata.core.scan.executor.QueryExecutorFactory;
+import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
+import org.apache.carbondata.core.scan.model.ProjectionDimension;
+import org.apache.carbondata.core.scan.model.ProjectionMeasure;
+import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.core.scan.result.iterator.AbstractDetailQueryResultIterator;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.hadoop.AbstractRecordReader;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * A specialized RecordReader that reads into InternalRows or ColumnarBatches directly using the
+ * carbondata column APIs and fills the data directly into columns.
+ */
+class PrestoCarbonVectorizedRecordReader extends AbstractRecordReader<Object> {
+
+ private int batchIdx = 0;
+
+ private int numBatched = 0;
+
+ private CarbonVectorBatch columnarBatch;
+
+ private CarbonColumnarBatch carbonColumnarBatch;
+
+ /**
+ * If true, this class returns batches instead of rows.
+ */
+ private boolean returnColumnarBatch;
+
+ private QueryModel queryModel;
+
+ private AbstractDetailQueryResultIterator iterator;
+
+ private QueryExecutor queryExecutor;
+
+ public PrestoCarbonVectorizedRecordReader(QueryExecutor queryExecutor, QueryModel queryModel, AbstractDetailQueryResultIterator iterator) {
+ this.queryModel = queryModel;
+ this.iterator = iterator;
+ this.queryExecutor = queryExecutor;
+ enableReturningBatches();
+ }
+
+ /**
+ * Implementation of RecordReader API.
+ */
+ @Override public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
+ throws IOException, InterruptedException, UnsupportedOperationException {
+ // The input split can contain single HDFS block or multiple blocks, so firstly get all the
+ // blocks and then set them in the query model.
+ List<CarbonInputSplit> splitList;
+ if (inputSplit instanceof CarbonInputSplit) {
+ splitList = new ArrayList<>(1);
+ splitList.add((CarbonInputSplit) inputSplit);
+ } else if (inputSplit instanceof CarbonMultiBlockSplit) {
+ // contains multiple blocks, this is an optimization for concurrent query.
+ CarbonMultiBlockSplit multiBlockSplit = (CarbonMultiBlockSplit) inputSplit;
+ splitList = multiBlockSplit.getAllSplits();
+ } else {
+ throw new RuntimeException("unsupported input split type: " + inputSplit);
+ }
+ List<TableBlockInfo> tableBlockInfoList = CarbonInputSplit.createBlocks(splitList);
+ queryModel.setTableBlockInfos(tableBlockInfoList);
+ queryModel.setVectorReader(true);
+ try {
+ queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel);
+ iterator = (AbstractDetailQueryResultIterator) queryExecutor.execute(queryModel);
+ } catch (QueryExecutionException e) {
+ throw new InterruptedException(e.getMessage());
+ }
+ }
+
+ @Override public void close() throws IOException {
+ logStatistics(rowCount, queryModel.getStatisticsRecorder());
+ if (columnarBatch != null) {
+ columnarBatch = null;
+ }
+ // clear dictionary cache
+ Map<String, Dictionary> columnToDictionaryMapping = queryModel.getColumnToDictionaryMapping();
+ if (null != columnToDictionaryMapping) {
+ for (Map.Entry<String, Dictionary> entry : columnToDictionaryMapping.entrySet()) {
+ CarbonUtil.clearDictionaryCache(entry.getValue());
+ }
+ }
+ try {
+ queryExecutor.finish();
+ } catch (QueryExecutionException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override public boolean nextKeyValue() throws IOException, InterruptedException {
+ resultBatch();
+
+ if (returnColumnarBatch) return nextBatch();
+
+ if (batchIdx >= numBatched) {
+ if (!nextBatch()) return false;
+ }
+ ++batchIdx;
+ return true;
+ }
+
+ @Override public Object getCurrentValue() throws IOException, InterruptedException {
+ if (returnColumnarBatch) {
+ rowCount += columnarBatch.numValidRows();
+ return columnarBatch;
+ } else {
+ return null;
+ }
+ }
+
+ @Override public Void getCurrentKey() throws IOException, InterruptedException {
+ return null;
+ }
+
+ @Override public float getProgress() throws IOException, InterruptedException {
+ // TODO : Implement it based on total number of rows it is going to retrive.
+ return 0;
+ }
+
+ /**
+ * Returns the ColumnarBatch object that will be used for all rows returned by this reader.
+ * This object is reused. Calling this enables the vectorized reader. This should be called
+ * before any calls to nextKeyValue/nextBatch.
+ */
+
+ private void initBatch() {
+ List<ProjectionDimension> queryDimension = queryModel.getProjectionDimensions();
+ List<ProjectionMeasure> queryMeasures = queryModel.getProjectionMeasures();
+ StructField[] fields = new StructField[queryDimension.size() + queryMeasures.size()];
+ for (int i = 0; i < queryDimension.size(); i++) {
+ ProjectionDimension dim = queryDimension.get(i);
+ if (dim.getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+ DirectDictionaryGenerator generator = DirectDictionaryKeyGeneratorFactory
+ .getDirectDictionaryGenerator(dim.getDimension().getDataType());
+ fields[dim.getOrdinal()] = new StructField(dim.getColumnName(),
+ generator.getReturnType());
+ } else if (!dim.getDimension().hasEncoding(Encoding.DICTIONARY)) {
+ fields[dim.getOrdinal()] = new StructField(dim.getColumnName(),
+ dim.getDimension().getDataType());
+ } else if (dim.getDimension().isComplex()) {
+ fields[dim.getOrdinal()] = new StructField(dim.getColumnName(),
+ dim.getDimension().getDataType());
+ } else {
+ fields[dim.getOrdinal()] = new StructField(dim.getColumnName(),
+ DataTypes.INT);
+ }
+ }
+
+ for (ProjectionMeasure msr : queryMeasures) {
+ DataType dataType = msr.getMeasure().getDataType();
+ if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.SHORT || dataType == DataTypes.INT
+ || dataType == DataTypes.LONG) {
+ fields[msr.getOrdinal()] =
+ new StructField(msr.getColumnName(), msr.getMeasure().getDataType());
+ } else if (DataTypes.isDecimal(dataType)) {
+ fields[msr.getOrdinal()] =
+ new StructField(msr.getColumnName(), msr.getMeasure().getDataType());
+ } else {
+ fields[msr.getOrdinal()] = new StructField(msr.getColumnName(), DataTypes.DOUBLE);
+ }
+ }
+
+ columnarBatch = CarbonVectorBatch.allocate(fields);
+ CarbonColumnVector[] vectors = new CarbonColumnVector[fields.length];
+ boolean[] filteredRows = new boolean[columnarBatch.capacity()];
+ for (int i = 0; i < fields.length; i++) {
+ vectors[i] = new CarbonColumnVectorWrapper(columnarBatch.column(i), filteredRows);
+ }
+ carbonColumnarBatch = new CarbonColumnarBatch(vectors, columnarBatch.capacity(), filteredRows);
+ }
+
+
+ private CarbonVectorBatch resultBatch() {
+ if (columnarBatch == null) initBatch();
+ return columnarBatch;
+ }
+
+ /*
+ * Can be called before any rows are returned to enable returning columnar batches directly.
+ */
+ private void enableReturningBatches() {
+ returnColumnarBatch = true;
+ }
+
+ /**
+ * Advances to the next batch of rows. Returns false if there are no more.
+ */
+ private boolean nextBatch() {
+ columnarBatch.reset();
+ carbonColumnarBatch.reset();
+ if (iterator.hasNext()) {
+ iterator.processNextBatch(carbonColumnarBatch);
+ int actualSize = carbonColumnarBatch.getActualSize();
+ columnarBatch.setNumRows(actualSize);
+ numBatched = actualSize;
+ batchIdx = 0;
+ return true;
+ }
+ return false;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/55c4e438/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index ab3ab5d..3c70619 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -171,7 +171,8 @@ class CarbonMergerRDD[K, V](
LOGGER.info(s"Restructured block exists: $restructuredBlockExists")
DataTypeUtil.setDataTypeConverter(new SparkDataTypeConverterImpl)
exec = new CarbonCompactionExecutor(segmentMapping, segmentProperties,
- carbonTable, dataFileMetadataSegMapping, restructuredBlockExists)
+ carbonTable, dataFileMetadataSegMapping, restructuredBlockExists,
+ new SparkDataTypeConverterImpl)
// fire a query and get the results.
var result2: java.util.List[RawResultIterator] = null
http://git-wip-us.apache.org/repos/asf/carbondata/blob/55c4e438/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 772f702..97be1fb 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -336,7 +336,7 @@ class CarbonScanRDD(
TaskMetricsMap.getInstance().registerThreadCallback()
inputMetricsStats.initBytesReadCallback(context, inputSplit)
val iterator = if (inputSplit.getAllSplits.size() > 0) {
- val model = format.getQueryModel(inputSplit, attemptContext)
+ val model = format.createQueryModel(inputSplit, attemptContext)
// get RecordReader by FileFormat
val reader: RecordReader[Void, Object] = inputSplit.getFileFormat match {
case FileFormat.ROW_V1 =>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/55c4e438/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
index 7d42130..432d50a 100644
--- a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
+++ b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
@@ -40,7 +40,7 @@ class ColumnarVectorWrapper implements CarbonColumnVector {
private DataType blockDataType;
- public ColumnarVectorWrapper(ColumnVector columnVector, boolean[] filteredRows) {
+ ColumnarVectorWrapper(ColumnVector columnVector, boolean[] filteredRows) {
this.columnVector = columnVector;
this.filteredRows = filteredRows;
this.dataType = CarbonScalaUtil.convertSparkToCarbonDataType(columnVector.dataType());
http://git-wip-us.apache.org/repos/asf/carbondata/blob/55c4e438/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
index 5d927df..73da878 100644
--- a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
+++ b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
@@ -35,8 +35,8 @@ import org.apache.carbondata.core.metadata.encoder.Encoding;
import org.apache.carbondata.core.scan.executor.QueryExecutor;
import org.apache.carbondata.core.scan.executor.QueryExecutorFactory;
import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
-import org.apache.carbondata.core.scan.model.QueryDimension;
-import org.apache.carbondata.core.scan.model.QueryMeasure;
+import org.apache.carbondata.core.scan.model.ProjectionDimension;
+import org.apache.carbondata.core.scan.model.ProjectionMeasure;
import org.apache.carbondata.core.scan.model.QueryModel;
import org.apache.carbondata.core.scan.result.iterator.AbstractDetailQueryResultIterator;
import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
@@ -100,7 +100,8 @@ class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> {
/**
* Implementation of RecordReader API.
*/
- @Override public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
+ @Override
+ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
throws IOException, InterruptedException, UnsupportedOperationException {
// The input split can contain single HDFS block or multiple blocks, so firstly get all the
// blocks and then set them in the query model.
@@ -145,7 +146,8 @@ class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> {
}
}
- @Override public void close() throws IOException {
+ @Override
+ public void close() throws IOException {
logStatistics(rowCount, queryModel.getStatisticsRecorder());
if (columnarBatch != null) {
columnarBatch.close();
@@ -165,10 +167,13 @@ class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> {
}
}
- @Override public boolean nextKeyValue() throws IOException, InterruptedException {
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
resultBatch();
- if (returnColumnarBatch) return nextBatch();
+ if (returnColumnarBatch) {
+ return nextBatch();
+ }
if (batchIdx >= numBatched) {
if (!nextBatch()) return false;
@@ -177,7 +182,8 @@ class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> {
return true;
}
- @Override public Object getCurrentValue() throws IOException, InterruptedException {
+ @Override
+ public Object getCurrentValue() throws IOException, InterruptedException {
if (returnColumnarBatch) {
int value = columnarBatch.numValidRows();
rowCount += value;
@@ -190,11 +196,13 @@ class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> {
return columnarBatch.getRow(batchIdx - 1);
}
- @Override public Void getCurrentKey() throws IOException, InterruptedException {
+ @Override
+ public Void getCurrentKey() throws IOException, InterruptedException {
return null;
}
- @Override public float getProgress() throws IOException, InterruptedException {
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
// TODO : Implement it based on total number of rows it is going to retrive.
return 0;
}
@@ -206,44 +214,44 @@ class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> {
*/
private void initBatch(MemoryMode memMode) {
- List<QueryDimension> queryDimension = queryModel.getQueryDimension();
- List<QueryMeasure> queryMeasures = queryModel.getQueryMeasures();
+ List<ProjectionDimension> queryDimension = queryModel.getProjectionDimensions();
+ List<ProjectionMeasure> queryMeasures = queryModel.getProjectionMeasures();
StructField[] fields = new StructField[queryDimension.size() + queryMeasures.size()];
for (int i = 0; i < queryDimension.size(); i++) {
- QueryDimension dim = queryDimension.get(i);
+ ProjectionDimension dim = queryDimension.get(i);
if (dim.getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY)) {
DirectDictionaryGenerator generator = DirectDictionaryKeyGeneratorFactory
.getDirectDictionaryGenerator(dim.getDimension().getDataType());
- fields[dim.getQueryOrder()] = new StructField(dim.getColumnName(),
+ fields[dim.getOrdinal()] = new StructField(dim.getColumnName(),
CarbonScalaUtil.convertCarbonToSparkDataType(generator.getReturnType()), true, null);
} else if (!dim.getDimension().hasEncoding(Encoding.DICTIONARY)) {
- fields[dim.getQueryOrder()] = new StructField(dim.getColumnName(),
+ fields[dim.getOrdinal()] = new StructField(dim.getColumnName(),
CarbonScalaUtil.convertCarbonToSparkDataType(dim.getDimension().getDataType()), true,
null);
} else if (dim.getDimension().isComplex()) {
- fields[dim.getQueryOrder()] = new StructField(dim.getColumnName(),
+ fields[dim.getOrdinal()] = new StructField(dim.getColumnName(),
CarbonScalaUtil.convertCarbonToSparkDataType(dim.getDimension().getDataType()), true,
null);
} else {
- fields[dim.getQueryOrder()] = new StructField(dim.getColumnName(),
+ fields[dim.getOrdinal()] = new StructField(dim.getColumnName(),
CarbonScalaUtil.convertCarbonToSparkDataType(DataTypes.INT), true, null);
}
}
for (int i = 0; i < queryMeasures.size(); i++) {
- QueryMeasure msr = queryMeasures.get(i);
+ ProjectionMeasure msr = queryMeasures.get(i);
DataType dataType = msr.getMeasure().getDataType();
if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.SHORT ||
dataType == DataTypes.INT || dataType == DataTypes.LONG) {
- fields[msr.getQueryOrder()] = new StructField(msr.getColumnName(),
+ fields[msr.getOrdinal()] = new StructField(msr.getColumnName(),
CarbonScalaUtil.convertCarbonToSparkDataType(msr.getMeasure().getDataType()), true,
null);
} else if (DataTypes.isDecimal(dataType)) {
- fields[msr.getQueryOrder()] = new StructField(msr.getColumnName(),
+ fields[msr.getOrdinal()] = new StructField(msr.getColumnName(),
new DecimalType(msr.getMeasure().getPrecision(), msr.getMeasure().getScale()), true,
null);
} else {
- fields[msr.getQueryOrder()] = new StructField(msr.getColumnName(),
+ fields[msr.getOrdinal()] = new StructField(msr.getColumnName(),
CarbonScalaUtil.convertCarbonToSparkDataType(DataTypes.DOUBLE), true, null);
}
}
@@ -261,9 +269,8 @@ class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> {
initBatch(DEFAULT_MEMORY_MODE);
}
- private ColumnarBatch resultBatch() {
+ private void resultBatch() {
if (columnarBatch == null) initBatch();
- return columnarBatch;
}
/*