You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2017/06/01 15:09:06 UTC
[1/2] carbondata git commit: Supported IUD for vector reader
Repository: carbondata
Updated Branches:
refs/heads/master 891062eb3 -> ddb80f729
Supported IUD for vector reader
Fixed commets
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/1c5b4a55
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/1c5b4a55
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/1c5b4a55
Branch: refs/heads/master
Commit: 1c5b4a5558b99de41e29091fbae879d76665de3a
Parents: 891062e
Author: ravipesala <ra...@gmail.com>
Authored: Wed May 31 20:54:49 2017 +0530
Committer: chenliang613 <ch...@apache.org>
Committed: Thu Jun 1 23:07:24 2017 +0800
----------------------------------------------------------------------
.../DictionaryBasedVectorResultCollector.java | 5 +-
.../core/scan/result/AbstractScannedResult.java | 25 +++-
.../scan/result/vector/CarbonColumnVector.java | 2 +
.../scan/result/vector/CarbonColumnarBatch.java | 33 ++++-
.../dataload/TestBatchSortDataLoad.scala | 20 +--
.../iud/UpdateCarbonTableTestCase.scala | 2 +-
.../vectorreader/ColumnarVectorWrapper.java | 121 ++++++++++++++++---
.../VectorizedCarbonRecordReader.java | 5 +-
.../spark/sql/hive/CarbonAnalysisRules.scala | 8 +-
.../spark/sql/hive/CarbonSessionState.scala | 3 +-
10 files changed, 180 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1c5b4a55/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java
index 91afe77..7a8fe06 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java
@@ -144,6 +144,8 @@ public class DictionaryBasedVectorResultCollector extends AbstractScannedResultC
return;
}
fillColumnVectorDetails(columnarBatch, rowCounter, requiredRows);
+ scannedResult.markFilteredRows(
+ columnarBatch, rowCounter, requiredRows, columnarBatch.getRowCounter());
scanAndFillResult(scannedResult, columnarBatch, rowCounter, availableRows, requiredRows);
}
}
@@ -162,7 +164,8 @@ public class DictionaryBasedVectorResultCollector extends AbstractScannedResultC
// Or set the row counter.
scannedResult.setRowCounter(rowCounter + requiredRows);
}
- columnarBatch.setActualSize(columnarBatch.getActualSize() + requiredRows);
+ columnarBatch.setActualSize(
+ columnarBatch.getActualSize() + requiredRows - columnarBatch.getRowsFilteredCount());
columnarBatch.setRowCounter(columnarBatch.getRowCounter() + requiredRows);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1c5b4a55/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java b/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java
index e57a290..a1074ea 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java
@@ -34,6 +34,7 @@ import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
import org.apache.carbondata.core.scan.executor.infos.KeyStructureInfo;
import org.apache.carbondata.core.scan.filter.GenericQueryType;
import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch;
import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.path.CarbonTablePath;
@@ -283,7 +284,8 @@ public abstract class AbstractScannedResult {
String data = getBlockletId();
if (CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID
.equals(columnVectorInfo.dimension.getColumnName())) {
- data = data + CarbonCommonConstants.FILE_SEPARATOR + j;
+ data = data + CarbonCommonConstants.FILE_SEPARATOR +
+ (rowMapping == null ? j : rowMapping[pageCounter][j]);
}
vector.putBytes(vectorOffset++, offset, data.length(), data.getBytes());
}
@@ -638,4 +640,25 @@ public abstract class AbstractScannedResult {
BlockletLevelDeleteDeltaDataCache blockletDeleteDeltaCache) {
this.blockletDeleteDeltaCache = blockletDeleteDeltaCache;
}
+
+ /**
+ * Mark the filtered rows in columnar batch. These rows will not be added to vector batches later.
+ * @param columnarBatch
+ * @param startRow
+ * @param size
+ * @param vectorOffset
+ */
+ public void markFilteredRows(CarbonColumnarBatch columnarBatch, int startRow, int size,
+ int vectorOffset) {
+ if (blockletDeleteDeltaCache != null) {
+ int len = startRow + size;
+ for (int i = startRow; i < len; i++) {
+ int rowId = rowMapping != null ? rowMapping[pageCounter][i] : i;
+ if (blockletDeleteDeltaCache.contains(rowId)) {
+ columnarBatch.markFiltered(vectorOffset);
+ }
+ vectorOffset++;
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1c5b4a55/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java
index 82a0b45..a3eb48b 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java
@@ -66,4 +66,6 @@ public interface CarbonColumnVector {
DataType getType();
+ void setFilteredRowsExist(boolean filteredRowsExist);
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1c5b4a55/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnarBatch.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnarBatch.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnarBatch.java
index faeffde..cfc2f16 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnarBatch.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnarBatch.java
@@ -17,6 +17,8 @@
package org.apache.carbondata.core.scan.result.vector;
+import java.util.Arrays;
+
public class CarbonColumnarBatch {
public CarbonColumnVector[] columnVectors;
@@ -27,9 +29,15 @@ public class CarbonColumnarBatch {
private int rowCounter;
- public CarbonColumnarBatch(CarbonColumnVector[] columnVectors, int batchSize) {
+ private boolean[] filteredRows;
+
+ private int rowsFiltered;
+
+ public CarbonColumnarBatch(CarbonColumnVector[] columnVectors, int batchSize,
+ boolean[] filteredRows) {
this.columnVectors = columnVectors;
this.batchSize = batchSize;
+ this.filteredRows = filteredRows;
}
public int getBatchSize() {
@@ -47,6 +55,8 @@ public class CarbonColumnarBatch {
public void reset() {
actualSize = 0;
rowCounter = 0;
+ rowsFiltered = 0;
+ Arrays.fill(filteredRows, false);
for (int i = 0; i < columnVectors.length; i++) {
columnVectors[i].reset();
}
@@ -59,4 +69,25 @@ public class CarbonColumnarBatch {
public void setRowCounter(int rowCounter) {
this.rowCounter = rowCounter;
}
+
+ /**
+ * Mark the rows as filterd first before filling the batch, so that these rows will not be added
+ * to vector batches.
+ * @param rowId
+ */
+ public void markFiltered(int rowId) {
+ if (!filteredRows[rowId]) {
+ filteredRows[rowId] = true;
+ rowsFiltered++;
+ }
+ if (rowsFiltered == 1) {
+ for (int i = 0; i < columnVectors.length; i++) {
+ columnVectors[i].setFilteredRowsExist(true);
+ }
+ }
+ }
+
+ public int getRowsFilteredCount() {
+ return rowsFiltered;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1c5b4a55/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala
index 43bcac8..d53b5e5 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala
@@ -36,7 +36,7 @@ class TestBatchSortDataLoad extends QueryTest with BeforeAndAfterAll {
val writer = new BufferedWriter(new FileWriter(file))
writer.write("c1,c2,c3, c4, c5, c6, c7, c8, c9, c10")
writer.newLine()
- for(i <- 0 until 200000) {
+ for(i <- 0 until 100000) {
writer.write("a" + i%1000 + "," +
"b" + i%1000 + "," +
"c" + i%1000 + "," +
@@ -84,9 +84,9 @@ class TestBatchSortDataLoad extends QueryTest with BeforeAndAfterAll {
sql(s"LOAD DATA LOCAL INPATH '$filePath' into table carbon_load1 " +
s"OPTIONS('sort_scope'='batch_sort', 'batch_sort_size_inmb'='1')")
- checkAnswer(sql("select count(*) from carbon_load1"), Seq(Row(200000)))
+ checkAnswer(sql("select count(*) from carbon_load1"), Seq(Row(100000)))
- assert(getIndexfileCount("carbon_load1") == 10, "Something wrong in batch sort")
+ assert(getIndexfileCount("carbon_load1") == 5, "Something wrong in batch sort")
}
test("test batch sort load by passing option to load command and compare with normal load") {
@@ -115,7 +115,7 @@ class TestBatchSortDataLoad extends QueryTest with BeforeAndAfterAll {
s"OPTIONS('sort_scope'='batch_sort', 'batch_sort_size_inmb'='1')")
sql("alter table carbon_load1 compact 'major'")
Thread.sleep(4000)
- checkAnswer(sql("select count(*) from carbon_load1"), Seq(Row(800000)))
+ checkAnswer(sql("select count(*) from carbon_load1"), Seq(Row(400000)))
assert(getIndexfileCount("carbon_load1", "0.1") == 1, "Something wrong in compaction after batch sort")
@@ -137,7 +137,7 @@ class TestBatchSortDataLoad extends QueryTest with BeforeAndAfterAll {
s"OPTIONS('sort_scope'='batch_sort', 'batch_sort_size_inmb'='1')")
sql(s"LOAD DATA LOCAL INPATH '$filePath' into table carbon_load5 ")
- checkAnswer(sql("select count(*) from carbon_load5"), Seq(Row(800000)))
+ checkAnswer(sql("select count(*) from carbon_load5"), Seq(Row(400000)))
checkAnswer(sql("select * from carbon_load1 where c1='a1' order by c1"),
sql("select * from carbon_load5 where c1='a1' order by c1"))
@@ -165,9 +165,9 @@ class TestBatchSortDataLoad extends QueryTest with BeforeAndAfterAll {
sql(s"LOAD DATA LOCAL INPATH '$filePath' into table carbon_load3 " +
s"OPTIONS('sort_scope'='batch_sort', 'batch_sort_size_inmb'='1', 'single_pass'='true')")
- checkAnswer(sql("select count(*) from carbon_load3"), Seq(Row(200000)))
+ checkAnswer(sql("select count(*) from carbon_load3"), Seq(Row(100000)))
- assert(getIndexfileCount("carbon_load3") == 10, "Something wrong in batch sort")
+ assert(getIndexfileCount("carbon_load3") == 5, "Something wrong in batch sort")
checkAnswer(sql("select * from carbon_load3 where c1='a1' order by c1"),
sql("select * from carbon_load2 where c1='a1' order by c1"))
@@ -186,9 +186,9 @@ class TestBatchSortDataLoad extends QueryTest with BeforeAndAfterAll {
sql(s"LOAD DATA LOCAL INPATH '$filePath' into table carbon_load4 " )
- checkAnswer(sql("select count(*) from carbon_load4"), Seq(Row(200000)))
+ checkAnswer(sql("select count(*) from carbon_load4"), Seq(Row(100000)))
- assert(getIndexfileCount("carbon_load4") == 10, "Something wrong in batch sort")
+ assert(getIndexfileCount("carbon_load4") == 5, "Something wrong in batch sort")
CarbonProperties.getInstance().
addProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)
@@ -206,7 +206,7 @@ class TestBatchSortDataLoad extends QueryTest with BeforeAndAfterAll {
sql(s"LOAD DATA LOCAL INPATH '$filePath' into table carbon_load6 " )
- checkAnswer(sql("select count(*) from carbon_load6"), Seq(Row(200000)))
+ checkAnswer(sql("select count(*) from carbon_load6"), Seq(Row(100000)))
assert(getIndexfileCount("carbon_load6") == 1, "Something wrong in batch sort")
CarbonProperties.getInstance().
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1c5b4a55/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
index 25fe91b..7917b61 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
@@ -42,7 +42,7 @@ class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.isHorizontalCompactionEnabled , "true")
CarbonProperties.getInstance()
- .addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER , "false")
+ .addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER , "true")
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1c5b4a55/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
index ad5e01f..c3d2a87 100644
--- a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
+++ b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
@@ -27,80 +27,158 @@ class ColumnarVectorWrapper implements CarbonColumnVector {
private ColumnVector columnVector;
- public ColumnarVectorWrapper(ColumnVector columnVector) {
+ private boolean[] filteredRows;
+
+ private int counter;
+
+ private boolean filteredRowsExist;
+
+ public ColumnarVectorWrapper(ColumnVector columnVector, boolean[] filteredRows) {
this.columnVector = columnVector;
+ this.filteredRows = filteredRows;
}
@Override public void putBoolean(int rowId, boolean value) {
- columnVector.putBoolean(rowId, value);
+ if (!filteredRows[rowId]) {
+ columnVector.putBoolean(counter++, value);
+ }
}
@Override public void putFloat(int rowId, float value) {
- columnVector.putFloat(rowId, value);
+ if (!filteredRows[rowId]) {
+ columnVector.putFloat(counter++, value);
+ }
}
@Override public void putShort(int rowId, short value) {
- columnVector.putShort(rowId, value);
+ if (!filteredRows[rowId]) {
+ columnVector.putShort(counter++, value);
+ }
}
@Override public void putShorts(int rowId, int count, short value) {
- columnVector.putShorts(rowId, count, value);
+ if (filteredRowsExist) {
+ for (int i = 0; i < count; i++) {
+ if (!filteredRows[rowId]) {
+ putShort(counter++, value);
+ }
+ rowId++;
+ }
+ } else {
+ columnVector.putShorts(rowId, count, value);
+ }
}
@Override public void putInt(int rowId, int value) {
- columnVector.putInt(rowId, value);
+ if (!filteredRows[rowId]) {
+ columnVector.putInt(counter++, value);
+ }
}
@Override public void putInts(int rowId, int count, int value) {
- columnVector.putInts(rowId, count, value);
+ if (filteredRowsExist) {
+ for (int i = 0; i < count; i++) {
+ if (!filteredRows[rowId]) {
+ putInt(counter++, value);
+ }
+ rowId++;
+ }
+ } else {
+ columnVector.putInts(rowId, count, value);
+ }
}
@Override public void putLong(int rowId, long value) {
- columnVector.putLong(rowId, value);
+ if (!filteredRows[rowId]) {
+ columnVector.putLong(counter++, value);
+ }
}
@Override public void putLongs(int rowId, int count, long value) {
- columnVector.putLongs(rowId, count, value);
+ if (filteredRowsExist) {
+ for (int i = 0; i < count; i++) {
+ if (!filteredRows[rowId]) {
+ putLong(counter++, value);
+ }
+ rowId++;
+ }
+ } else {
+ columnVector.putLongs(rowId, count, value);
+ }
}
@Override public void putDecimal(int rowId, Decimal value, int precision) {
- columnVector.putDecimal(rowId, value, precision);
+ if (!filteredRows[rowId]) {
+ columnVector.putDecimal(counter++, value, precision);
+ }
}
@Override public void putDecimals(int rowId, int count, Decimal value, int precision) {
for (int i = 0; i < count; i++) {
- putDecimal(rowId++, value, precision);
+ if (!filteredRows[rowId]) {
+ putDecimal(counter++, value, precision);
+ }
+ rowId++;
}
}
@Override public void putDouble(int rowId, double value) {
- columnVector.putDouble(rowId, value);
+ if (!filteredRows[rowId]) {
+ columnVector.putDouble(counter++, value);
+ }
}
@Override public void putDoubles(int rowId, int count, double value) {
- columnVector.putDoubles(rowId, count, value);
+ if (filteredRowsExist) {
+ for (int i = 0; i < count; i++) {
+ if (!filteredRows[rowId]) {
+ putDouble(counter++, value);
+ }
+ rowId++;
+ }
+ } else {
+ columnVector.putDoubles(rowId, count, value);
+ }
}
@Override public void putBytes(int rowId, byte[] value) {
- columnVector.putByteArray(rowId, value);
+ if (!filteredRows[rowId]) {
+ columnVector.putByteArray(counter++, value);
+ }
}
@Override public void putBytes(int rowId, int count, byte[] value) {
for (int i = 0; i < count; i++) {
- putBytes(rowId++, value);
+ if (!filteredRows[rowId]) {
+ putBytes(counter++, value);
+ }
+ rowId++;
}
}
@Override public void putBytes(int rowId, int offset, int length, byte[] value) {
- columnVector.putByteArray(rowId, value, offset, length);
+ if (!filteredRows[rowId]) {
+ columnVector.putByteArray(counter++, value, offset, length);
+ }
}
@Override public void putNull(int rowId) {
- columnVector.putNull(rowId);
+ if (!filteredRows[rowId]) {
+ columnVector.putNull(counter++);
+ }
}
@Override public void putNulls(int rowId, int count) {
- columnVector.putNulls(rowId, count);
+ if (filteredRowsExist) {
+ for (int i = 0; i < count; i++) {
+ if (!filteredRows[rowId]) {
+ putNull(counter++);
+ }
+ rowId++;
+ }
+ } else {
+ columnVector.putNulls(rowId, count);
+ }
}
@Override public boolean isNull(int rowId) {
@@ -117,10 +195,15 @@ class ColumnarVectorWrapper implements CarbonColumnVector {
}
@Override public void reset() {
-// columnVector.reset();
+ counter = 0;
+ filteredRowsExist = false;
}
@Override public DataType getType() {
return columnVector.dataType();
}
+
+ @Override public void setFilteredRowsExist(boolean filteredRowsExist) {
+ this.filteredRowsExist = filteredRowsExist;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1c5b4a55/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
index 3fdf9af..173c527 100644
--- a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
+++ b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
@@ -219,10 +219,11 @@ class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> {
columnarBatch = ColumnarBatch.allocate(new StructType(fields), memMode);
CarbonColumnVector[] vectors = new CarbonColumnVector[fields.length];
+ boolean[] filteredRows = new boolean[columnarBatch.capacity()];
for (int i = 0; i < fields.length; i++) {
- vectors[i] = new ColumnarVectorWrapper(columnarBatch.column(i));
+ vectors[i] = new ColumnarVectorWrapper(columnarBatch.column(i), filteredRows);
}
- carbonColumnarBatch = new CarbonColumnarBatch(vectors, columnarBatch.capacity());
+ carbonColumnarBatch = new CarbonColumnarBatch(vectors, columnarBatch.capacity(), filteredRows);
}
private void initBatch() {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1c5b4a55/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
index 0fb5c47..c9fc46c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
@@ -79,13 +79,7 @@ object CarbonPreInsertionCasts extends Rule[LogicalPlan] {
}
}
-object CarbonIUDAnalysisRule extends Rule[LogicalPlan] {
-
- var sparkSession: SparkSession = _
-
- def init(sparkSession: SparkSession) {
- this.sparkSession = sparkSession
- }
+case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[LogicalPlan] {
private val parser = new SparkSqlParser(sparkSession.sessionState.conf)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1c5b4a55/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
index e413840..156a12e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -67,7 +67,6 @@ class CarbonSessionCatalog(
lazy val carbonEnv = {
val env = new CarbonEnv
env.init(sparkSession)
- CarbonIUDAnalysisRule.init(sparkSession)
env
}
@@ -130,7 +129,7 @@ class CarbonSessionState(sparkSession: SparkSession) extends HiveSessionState(sp
catalog.ParquetConversions ::
catalog.OrcConversions ::
CarbonPreInsertionCasts ::
- CarbonIUDAnalysisRule ::
+ CarbonIUDAnalysisRule(sparkSession) ::
AnalyzeCreateTable(sparkSession) ::
PreprocessTableInsertion(conf) ::
DataSourceAnalysis(conf) ::
[2/2] carbondata git commit: Support delete operation in vector
reader of Spark 2.1 This closes #986
Posted by ch...@apache.org.
Support delete operation in vector reader of Spark 2.1 This closes #986
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/ddb80f72
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/ddb80f72
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/ddb80f72
Branch: refs/heads/master
Commit: ddb80f72925f64741ab168409eb6df9533674ede
Parents: 891062e 1c5b4a5
Author: chenliang613 <ch...@apache.org>
Authored: Thu Jun 1 23:08:44 2017 +0800
Committer: chenliang613 <ch...@apache.org>
Committed: Thu Jun 1 23:08:44 2017 +0800
----------------------------------------------------------------------
.../DictionaryBasedVectorResultCollector.java | 5 +-
.../core/scan/result/AbstractScannedResult.java | 25 +++-
.../scan/result/vector/CarbonColumnVector.java | 2 +
.../scan/result/vector/CarbonColumnarBatch.java | 33 ++++-
.../dataload/TestBatchSortDataLoad.scala | 20 +--
.../iud/UpdateCarbonTableTestCase.scala | 2 +-
.../vectorreader/ColumnarVectorWrapper.java | 121 ++++++++++++++++---
.../VectorizedCarbonRecordReader.java | 5 +-
.../spark/sql/hive/CarbonAnalysisRules.scala | 8 +-
.../spark/sql/hive/CarbonSessionState.scala | 3 +-
10 files changed, 180 insertions(+), 44 deletions(-)
----------------------------------------------------------------------