You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/11/13 19:21:14 UTC
[9/9] carbondata git commit: [CARBONDATA-1520] [PreAgg] Support
pre-aggregate table load
[CARBONDATA-1520] [PreAgg] Support pre-aggregate table load
This closes #1446
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/cc0e6f1e
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/cc0e6f1e
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/cc0e6f1e
Branch: refs/heads/master
Commit: cc0e6f1e77b39d712de0e6101b2d24e57c5b47cb
Parents: f7f516e
Author: kunal642 <ku...@gmail.com>
Authored: Thu Oct 26 17:09:54 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Tue Nov 14 00:48:16 2017 +0530
----------------------------------------------------------------------
.../core/constants/CarbonCommonConstants.java | 6 +
.../core/indexstore/UnsafeMemoryDMStore.java | 14 +-
.../blockletindex/BlockletDataMap.java | 41 +++--
.../core/indexstore/row/DataMapRow.java | 6 +-
.../core/indexstore/row/DataMapRowImpl.java | 4 +-
.../core/indexstore/row/UnsafeDataMapRow.java | 8 +-
.../core/indexstore/schema/CarbonRowSchema.java | 124 +++++++++++++
.../core/indexstore/schema/DataMapSchema.java | 124 -------------
.../metadata/converter/SchemaConverter.java | 2 +-
.../core/metadata/schema/table/CarbonTable.java | 5 +
.../metadata/schema/table/DataMapSchema.java | 4 +-
.../schema/table/RelationIdentifier.java | 4 +
.../core/metadata/schema/table/TableInfo.java | 2 +-
.../core/scan/executor/util/QueryUtil.java | 44 +++--
.../carbondata/core/util/SessionParams.java | 13 ++
.../core/writer/CarbonIndexFileMergeWriter.java | 2 +-
.../carbondata/events/OperationListenerBus.java | 3 +-
.../hadoop/api/CarbonTableInputFormat.java | 19 +-
.../src/test/resources/sample.csv | 2 +
.../dataload/TestLoadDataGeneral.scala | 10 +-
.../TestLoadDataWithAutoLoadMerge.scala | 2 +-
.../TestLoadDataWithYarnLocalDirs.scala | 2 +-
.../preaggregate/TestPreAggregateLoad.scala | 172 +++++++++++++++++++
.../deleteTable/TestDeleteTableNewDDL.scala | 2 +
.../carbondata/spark/rdd/CarbonScanRDD.scala | 13 +-
.../spark/rdd/CarbonDataRDDFactory.scala | 8 +-
.../spark/sql/CarbonDictionaryDecoder.scala | 24 ++-
.../command/management/LoadTableCommand.scala | 4 +
.../CreatePreAggregateTableCommand.scala | 5 +
.../DropPreAggregateTablePostListener.scala | 49 ------
.../preaaggregate/PreAggregateListeners.scala | 81 +++++++++
.../preaaggregate/PreAggregateUtil.scala | 23 ++-
.../spark/sql/hive/CarbonAnalysisRules.scala | 57 +++++-
.../spark/sql/hive/CarbonSessionState.scala | 7 +-
.../execution/command/CarbonHiveCommands.scala | 3 +
.../impl/DictionaryFieldConverterImpl.java | 8 +-
.../converter/impl/FieldEncoderFactory.java | 38 +++-
.../loading/model/CarbonLoadModel.java | 13 ++
38 files changed, 685 insertions(+), 263 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cc0e6f1e/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 014478f..e27e5bd 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -57,6 +57,12 @@ public final class CarbonCommonConstants {
public static final String CARBON_INPUT_SEGMENTS = "carbon.input.segments.";
/**
+ * Fetch and validate the segments.
+ * Used for aggregate table load as segment validation is not required.
+ */
+ public static final String VALIDATE_CARBON_INPUT_SEGMENTS = "validate.carbon.input.segments.";
+
+ /**
* location of the carbon member, hierarchy and fact files
*/
@CarbonProperty
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cc0e6f1e/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
index bf1678a..450796a 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
@@ -18,7 +18,7 @@ package org.apache.carbondata.core.indexstore;
import org.apache.carbondata.core.indexstore.row.DataMapRow;
import org.apache.carbondata.core.indexstore.row.UnsafeDataMapRow;
-import org.apache.carbondata.core.indexstore.schema.DataMapSchema;
+import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema;
import org.apache.carbondata.core.memory.MemoryBlock;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.memory.UnsafeMemoryManager;
@@ -44,7 +44,7 @@ public class UnsafeMemoryDMStore {
private boolean isMemoryFreed;
- private DataMapSchema[] schema;
+ private CarbonRowSchema[] schema;
private int[] pointers;
@@ -52,7 +52,7 @@ public class UnsafeMemoryDMStore {
private final long taskId = ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId();
- public UnsafeMemoryDMStore(DataMapSchema[] schema) throws MemoryException {
+ public UnsafeMemoryDMStore(CarbonRowSchema[] schema) throws MemoryException {
this.schema = schema;
this.allocatedSize = capacity;
this.memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, allocatedSize);
@@ -101,7 +101,7 @@ public class UnsafeMemoryDMStore {
pointers[rowCount++] = pointer;
}
- private void addToUnsafe(DataMapSchema schema, DataMapRow row, int index) {
+ private void addToUnsafe(CarbonRowSchema schema, DataMapRow row, int index) {
switch (schema.getSchemaType()) {
case FIXED:
DataType dataType = schema.getDataType();
@@ -154,8 +154,8 @@ public class UnsafeMemoryDMStore {
runningLength += data.length;
break;
case STRUCT:
- DataMapSchema[] childSchemas =
- ((DataMapSchema.StructDataMapSchema) schema).getChildSchemas();
+ CarbonRowSchema[] childSchemas =
+ ((CarbonRowSchema.StructCarbonRowSchema) schema).getChildSchemas();
DataMapRow struct = row.getRow(index);
for (int i = 0; i < childSchemas.length; i++) {
addToUnsafe(childSchemas[i], struct, i);
@@ -200,7 +200,7 @@ public class UnsafeMemoryDMStore {
return runningLength;
}
- public DataMapSchema[] getSchema() {
+ public CarbonRowSchema[] getSchema() {
return schema;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cc0e6f1e/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
index 7829034..43e265d 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
@@ -45,7 +45,7 @@ import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
import org.apache.carbondata.core.indexstore.UnsafeMemoryDMStore;
import org.apache.carbondata.core.indexstore.row.DataMapRow;
import org.apache.carbondata.core.indexstore.row.DataMapRowImpl;
-import org.apache.carbondata.core.indexstore.schema.DataMapSchema;
+import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema;
import org.apache.carbondata.core.keygenerator.KeyGenException;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
@@ -132,7 +132,7 @@ public class BlockletDataMap implements DataMap, Cacheable {
String filePath) {
int[] minMaxLen = segmentProperties.getColumnsValueSize();
List<BlockletInfo> blockletList = fileFooter.getBlockletList();
- DataMapSchema[] schema = unsafeMemoryDMStore.getSchema();
+ CarbonRowSchema[] schema = unsafeMemoryDMStore.getSchema();
for (int index = 0; index < blockletList.size(); index++) {
DataMapRow row = new DataMapRowImpl(schema);
int ordinal = 0;
@@ -256,9 +256,10 @@ public class BlockletDataMap implements DataMap, Cacheable {
return updatedValues;
}
- private DataMapRow addMinMax(int[] minMaxLen, DataMapSchema dataMapSchema, byte[][] minValues) {
- DataMapSchema[] minSchemas =
- ((DataMapSchema.StructDataMapSchema) dataMapSchema).getChildSchemas();
+ private DataMapRow addMinMax(int[] minMaxLen, CarbonRowSchema carbonRowSchema,
+ byte[][] minValues) {
+ CarbonRowSchema[] minSchemas =
+ ((CarbonRowSchema.StructCarbonRowSchema) carbonRowSchema).getChildSchemas();
DataMapRow minRow = new DataMapRowImpl(minSchemas);
int minOrdinal = 0;
// min value adding
@@ -269,46 +270,48 @@ public class BlockletDataMap implements DataMap, Cacheable {
}
private void createSchema(SegmentProperties segmentProperties) throws MemoryException {
- List<DataMapSchema> indexSchemas = new ArrayList<>();
+ List<CarbonRowSchema> indexSchemas = new ArrayList<>();
// Index key
- indexSchemas.add(new DataMapSchema.VariableDataMapSchema(DataTypes.BYTE_ARRAY));
+ indexSchemas.add(new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY));
int[] minMaxLen = segmentProperties.getColumnsValueSize();
// do it 2 times, one for min and one for max.
for (int k = 0; k < 2; k++) {
- DataMapSchema[] mapSchemas = new DataMapSchema[minMaxLen.length];
+ CarbonRowSchema[] mapSchemas = new CarbonRowSchema[minMaxLen.length];
for (int i = 0; i < minMaxLen.length; i++) {
if (minMaxLen[i] <= 0) {
- mapSchemas[i] = new DataMapSchema.VariableDataMapSchema(DataTypes.BYTE_ARRAY);
+ mapSchemas[i] = new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY);
} else {
- mapSchemas[i] = new DataMapSchema.FixedDataMapSchema(DataTypes.BYTE_ARRAY, minMaxLen[i]);
+ mapSchemas[i] =
+ new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.BYTE_ARRAY, minMaxLen[i]);
}
}
- DataMapSchema mapSchema = new DataMapSchema.StructDataMapSchema(
- DataTypes.createDefaultStructType(), mapSchemas);
+ CarbonRowSchema mapSchema =
+ new CarbonRowSchema.StructCarbonRowSchema(DataTypes.createDefaultStructType(),
+ mapSchemas);
indexSchemas.add(mapSchema);
}
// for number of rows.
- indexSchemas.add(new DataMapSchema.FixedDataMapSchema(DataTypes.INT));
+ indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.INT));
// for table block path
- indexSchemas.add(new DataMapSchema.VariableDataMapSchema(DataTypes.BYTE_ARRAY));
+ indexSchemas.add(new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY));
// for number of pages.
- indexSchemas.add(new DataMapSchema.FixedDataMapSchema(DataTypes.SHORT));
+ indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.SHORT));
// for version number.
- indexSchemas.add(new DataMapSchema.FixedDataMapSchema(DataTypes.SHORT));
+ indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.SHORT));
// for schema updated time.
- indexSchemas.add(new DataMapSchema.FixedDataMapSchema(DataTypes.LONG));
+ indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.LONG));
//for blocklet info
- indexSchemas.add(new DataMapSchema.VariableDataMapSchema(DataTypes.BYTE_ARRAY));
+ indexSchemas.add(new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY));
unsafeMemoryDMStore =
- new UnsafeMemoryDMStore(indexSchemas.toArray(new DataMapSchema[indexSchemas.size()]));
+ new UnsafeMemoryDMStore(indexSchemas.toArray(new CarbonRowSchema[indexSchemas.size()]));
}
@Override
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cc0e6f1e/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java
index 631e0ad..b764bdf 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java
@@ -16,7 +16,7 @@
*/
package org.apache.carbondata.core.indexstore.row;
-import org.apache.carbondata.core.indexstore.schema.DataMapSchema;
+import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema;
/**
* It is just a normal row to store data. Implementation classes could be safe and unsafe.
@@ -24,9 +24,9 @@ import org.apache.carbondata.core.indexstore.schema.DataMapSchema;
*/
public abstract class DataMapRow {
- protected DataMapSchema[] schemas;
+ protected CarbonRowSchema[] schemas;
- public DataMapRow(DataMapSchema[] schemas) {
+ public DataMapRow(CarbonRowSchema[] schemas) {
this.schemas = schemas;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cc0e6f1e/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java
index 032b29e..0bb4a5c 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java
@@ -16,7 +16,7 @@
*/
package org.apache.carbondata.core.indexstore.row;
-import org.apache.carbondata.core.indexstore.schema.DataMapSchema;
+import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
/**
@@ -26,7 +26,7 @@ public class DataMapRowImpl extends DataMapRow {
private Object[] data;
- public DataMapRowImpl(DataMapSchema[] schemas) {
+ public DataMapRowImpl(CarbonRowSchema[] schemas) {
super(schemas);
this.data = new Object[schemas.length];
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cc0e6f1e/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java b/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
index 2c76990..932865d 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
@@ -17,7 +17,7 @@
package org.apache.carbondata.core.indexstore.row;
-import org.apache.carbondata.core.indexstore.schema.DataMapSchema;
+import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema;
import org.apache.carbondata.core.memory.MemoryBlock;
import static org.apache.carbondata.core.memory.CarbonUnsafe.BYTE_ARRAY_OFFSET;
@@ -32,7 +32,7 @@ public class UnsafeDataMapRow extends DataMapRow {
private int pointer;
- public UnsafeDataMapRow(DataMapSchema[] schemas, MemoryBlock block, int pointer) {
+ public UnsafeDataMapRow(CarbonRowSchema[] schemas, MemoryBlock block, int pointer) {
super(schemas);
this.block = block;
this.pointer = pointer;
@@ -84,8 +84,8 @@ public class UnsafeDataMapRow extends DataMapRow {
}
@Override public DataMapRow getRow(int ordinal) {
- DataMapSchema[] childSchemas =
- ((DataMapSchema.StructDataMapSchema) schemas[ordinal]).getChildSchemas();
+ CarbonRowSchema[] childSchemas =
+ ((CarbonRowSchema.StructCarbonRowSchema) schemas[ordinal]).getChildSchemas();
return new UnsafeDataMapRow(childSchemas, block, pointer + getPosition(ordinal));
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cc0e6f1e/core/src/main/java/org/apache/carbondata/core/indexstore/schema/CarbonRowSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/schema/CarbonRowSchema.java b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/CarbonRowSchema.java
new file mode 100644
index 0000000..813be4a
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/CarbonRowSchema.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore.schema;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
+
+/**
+ * It just have 2 types right now, either fixed or variable.
+ */
+public abstract class CarbonRowSchema {
+
+ protected DataType dataType;
+
+ public CarbonRowSchema(DataType dataType) {
+ this.dataType = dataType;
+ }
+
+ /**
+ * Either fixed or variable length.
+ *
+ * @return
+ */
+ public DataType getDataType() {
+ return dataType;
+ }
+
+ /**
+ * Gives length in case of fixed schema other wise returns length
+ *
+ * @return
+ */
+ public abstract int getLength();
+
+ /**
+ * schema type
+ * @return
+ */
+ public abstract DataMapSchemaType getSchemaType();
+
+ /*
+ * It has always fixed length, length cannot be updated later.
+ * Usage examples : all primitive types like short, int etc
+ */
+ public static class FixedCarbonRowSchema extends CarbonRowSchema {
+
+ private int length;
+
+ public FixedCarbonRowSchema(DataType dataType) {
+ super(dataType);
+ }
+
+ public FixedCarbonRowSchema(DataType dataType, int length) {
+ super(dataType);
+ this.length = length;
+ }
+
+ @Override public int getLength() {
+ if (length == 0) {
+ return dataType.getSizeInBytes();
+ } else {
+ return length;
+ }
+ }
+
+ @Override public DataMapSchemaType getSchemaType() {
+ return DataMapSchemaType.FIXED;
+ }
+ }
+
+ public static class VariableCarbonRowSchema extends CarbonRowSchema {
+
+ public VariableCarbonRowSchema(DataType dataType) {
+ super(dataType);
+ }
+
+ @Override public int getLength() {
+ return dataType.getSizeInBytes();
+ }
+
+ @Override public DataMapSchemaType getSchemaType() {
+ return DataMapSchemaType.VARIABLE;
+ }
+ }
+
+ public static class StructCarbonRowSchema extends CarbonRowSchema {
+
+ private CarbonRowSchema[] childSchemas;
+
+ public StructCarbonRowSchema(DataType dataType, CarbonRowSchema[] childSchemas) {
+ super(dataType);
+ this.childSchemas = childSchemas;
+ }
+
+ @Override public int getLength() {
+ return dataType.getSizeInBytes();
+ }
+
+ public CarbonRowSchema[] getChildSchemas() {
+ return childSchemas;
+ }
+
+ @Override public DataMapSchemaType getSchemaType() {
+ return DataMapSchemaType.STRUCT;
+ }
+ }
+
+ public enum DataMapSchemaType {
+ FIXED, VARIABLE, STRUCT
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cc0e6f1e/core/src/main/java/org/apache/carbondata/core/indexstore/schema/DataMapSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/schema/DataMapSchema.java b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/DataMapSchema.java
deleted file mode 100644
index 80c68ac..0000000
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/schema/DataMapSchema.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.indexstore.schema;
-
-import org.apache.carbondata.core.metadata.datatype.DataType;
-
-/**
- * It just have 2 types right now, either fixed or variable.
- */
-public abstract class DataMapSchema {
-
- protected DataType dataType;
-
- public DataMapSchema(DataType dataType) {
- this.dataType = dataType;
- }
-
- /**
- * Either fixed or variable length.
- *
- * @return
- */
- public DataType getDataType() {
- return dataType;
- }
-
- /**
- * Gives length in case of fixed schema other wise returns length
- *
- * @return
- */
- public abstract int getLength();
-
- /**
- * schema type
- * @return
- */
- public abstract DataMapSchemaType getSchemaType();
-
- /*
- * It has always fixed length, length cannot be updated later.
- * Usage examples : all primitive types like short, int etc
- */
- public static class FixedDataMapSchema extends DataMapSchema {
-
- private int length;
-
- public FixedDataMapSchema(DataType dataType) {
- super(dataType);
- }
-
- public FixedDataMapSchema(DataType dataType, int length) {
- super(dataType);
- this.length = length;
- }
-
- @Override public int getLength() {
- if (length == 0) {
- return dataType.getSizeInBytes();
- } else {
- return length;
- }
- }
-
- @Override public DataMapSchemaType getSchemaType() {
- return DataMapSchemaType.FIXED;
- }
- }
-
- public static class VariableDataMapSchema extends DataMapSchema {
-
- public VariableDataMapSchema(DataType dataType) {
- super(dataType);
- }
-
- @Override public int getLength() {
- return dataType.getSizeInBytes();
- }
-
- @Override public DataMapSchemaType getSchemaType() {
- return DataMapSchemaType.VARIABLE;
- }
- }
-
- public static class StructDataMapSchema extends DataMapSchema {
-
- private DataMapSchema[] childSchemas;
-
- public StructDataMapSchema(DataType dataType, DataMapSchema[] childSchemas) {
- super(dataType);
- this.childSchemas = childSchemas;
- }
-
- @Override public int getLength() {
- return dataType.getSizeInBytes();
- }
-
- public DataMapSchema[] getChildSchemas() {
- return childSchemas;
- }
-
- @Override public DataMapSchemaType getSchemaType() {
- return DataMapSchemaType.STRUCT;
- }
- }
-
- public enum DataMapSchemaType {
- FIXED, VARIABLE, STRUCT
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cc0e6f1e/core/src/main/java/org/apache/carbondata/core/metadata/converter/SchemaConverter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/converter/SchemaConverter.java b/core/src/main/java/org/apache/carbondata/core/metadata/converter/SchemaConverter.java
index bfbb6f7..af86253 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/converter/SchemaConverter.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/converter/SchemaConverter.java
@@ -106,7 +106,7 @@ public interface SchemaConverter {
* method to convert thrift datamap schema object to wrapper
* data map object
* @param thriftchildSchema
- * @return DataMapSchema
+ * @return CarbonRowSchema
*/
DataMapSchema fromExternalToWrapperDataMapSchema(
org.apache.carbondata.format.DataMapSchema thriftchildSchema);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cc0e6f1e/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index e63f4e3..4a6fb8b 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -702,4 +702,9 @@ public class CarbonTable implements Serializable {
this.dimensionOrdinalMax = dimensionOrdinalMax;
}
+ public boolean isPreAggregateTable() {
+ return tableInfo.getParentRelationIdentifiers() != null && !tableInfo
+ .getParentRelationIdentifiers().isEmpty();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cc0e6f1e/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
index 5f92ec8..9c71e37 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
@@ -28,6 +28,8 @@ import java.util.Map;
*/
public class DataMapSchema implements Serializable, Writable {
+ private static final long serialVersionUID = 6577149126264181553L;
+
private String className;
private RelationIdentifier relationIdentifier;
@@ -100,7 +102,7 @@ public class DataMapSchema implements Serializable, Writable {
this.className = in.readUTF();
boolean isRelationIdnentifierExists = in.readBoolean();
if (isRelationIdnentifierExists) {
- this.relationIdentifier = new RelationIdentifier(null, null, null);
+ this.relationIdentifier = new RelationIdentifier();
this.relationIdentifier.readFields(in);
}
boolean isChildSchemaExists = in.readBoolean();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cc0e6f1e/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/RelationIdentifier.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/RelationIdentifier.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/RelationIdentifier.java
index 9a70b8b..2a2d937 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/RelationIdentifier.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/RelationIdentifier.java
@@ -38,6 +38,10 @@ public class RelationIdentifier implements Serializable, Writable {
this.tableId = tableId;
}
+ public RelationIdentifier() {
+ this(null, null, null);
+ }
+
public String getDatabaseName() {
return databaseName;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cc0e6f1e/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
index 3acd6d6..44d8126 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
@@ -261,7 +261,7 @@ public class TableInfo implements Serializable, Writable {
out.writeUTF(metaDataFilepath);
out.writeUTF(storePath);
boolean isChildSchemaExists =
- null != dataMapSchemaList && dataMapSchemaList.size() > 0 ? true : false;
+ null != dataMapSchemaList && dataMapSchemaList.size() > 0;
out.writeBoolean(isChildSchemaExists);
if (isChildSchemaExists) {
out.writeShort(dataMapSchemaList.size());
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cc0e6f1e/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
index b090e59..4d13462 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
@@ -42,6 +42,7 @@ import org.apache.carbondata.core.keygenerator.KeyGenerator;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.CarbonMetadata;
import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.metadata.ColumnIdentifier;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.metadata.encoder.Encoding;
@@ -353,14 +354,15 @@ public class QueryUtil {
CacheProvider cacheProvider = CacheProvider.getInstance();
Cache<DictionaryColumnUniqueIdentifier, Dictionary> forwardDictionaryCache = cacheProvider
.createCache(CacheType.FORWARD_DICTIONARY, absoluteTableIdentifier.getStorePath());
-
List<Dictionary> columnDictionaryList =
forwardDictionaryCache.getAll(dictionaryColumnUniqueIdentifiers);
Map<String, Dictionary> columnDictionaryMap = new HashMap<>(columnDictionaryList.size());
for (int i = 0; i < dictionaryColumnUniqueIdentifiers.size(); i++) {
// TODO: null check for column dictionary, if cache size is less it
// might return null here, in that case throw exception
- columnDictionaryMap.put(dictionaryColumnIdList.get(i), columnDictionaryList.get(i));
+ columnDictionaryMap
+ .put(dictionaryColumnUniqueIdentifiers.get(i).getColumnIdentifier().getColumnId(),
+ columnDictionaryList.get(i));
}
return columnDictionaryMap;
}
@@ -376,27 +378,47 @@ public class QueryUtil {
List<String> dictionaryColumnIdList, CarbonTableIdentifier carbonTableIdentifier,
TableProvider tableProvider) throws IOException {
CarbonTable carbonTable = tableProvider.getCarbonTable(carbonTableIdentifier);
- CarbonTablePath carbonTablePath =
- CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath(), carbonTableIdentifier);
List<DictionaryColumnUniqueIdentifier> dictionaryColumnUniqueIdentifiers =
new ArrayList<>(dictionaryColumnIdList.size());
for (String columnId : dictionaryColumnIdList) {
CarbonDimension dimension = CarbonMetadata.getInstance()
.getCarbonDimensionBasedOnColIdentifier(carbonTable, columnId);
if (dimension != null) {
+ CarbonTableIdentifier newCarbonTableIdentifier;
+ ColumnIdentifier columnIdentifier;
+ if (null != dimension.getColumnSchema().getParentColumnTableRelations() && !dimension
+ .getColumnSchema().getParentColumnTableRelations().isEmpty()) {
+ newCarbonTableIdentifier = getTableIdentifierForColumn(dimension);
+ columnIdentifier = new ColumnIdentifier(
+ dimension.getColumnSchema().getParentColumnTableRelations().get(0).getColumnId(),
+ dimension.getColumnProperties(), dimension.getDataType());
+ } else {
+ newCarbonTableIdentifier = carbonTableIdentifier;
+ columnIdentifier = dimension.getColumnIdentifier();
+ }
+ CarbonTablePath newCarbonTablePath = CarbonStorePath
+ .getCarbonTablePath(carbonTable.getStorePath(), newCarbonTableIdentifier);
+
dictionaryColumnUniqueIdentifiers.add(
- new DictionaryColumnUniqueIdentifier(
- carbonTableIdentifier,
- dimension.getColumnIdentifier(),
- dimension.getDataType(),
- carbonTablePath
- )
- );
+ new DictionaryColumnUniqueIdentifier(newCarbonTableIdentifier, columnIdentifier,
+ dimension.getDataType(), newCarbonTablePath));
}
}
return dictionaryColumnUniqueIdentifiers;
}
+ public static CarbonTableIdentifier getTableIdentifierForColumn(CarbonDimension carbonDimension) {
+ String parentTableName =
+ carbonDimension.getColumnSchema().getParentColumnTableRelations().get(0)
+ .getRelationIdentifier().getTableName();
+ String parentDatabaseName =
+ carbonDimension.getColumnSchema().getParentColumnTableRelations().get(0)
+ .getRelationIdentifier().getDatabaseName();
+ String parentTableId = carbonDimension.getColumnSchema().getParentColumnTableRelations().get(0)
+ .getRelationIdentifier().getTableId();
+ return new CarbonTableIdentifier(parentDatabaseName, parentTableName, parentTableId);
+ }
+
/**
* Below method will used to get the method will be used to get the measure
* block indexes to be read from the file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cc0e6f1e/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
index 1a91272..5dda9e4 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
@@ -66,6 +66,13 @@ public class SessionParams implements Serializable {
return sProps.get(key);
}
+ public String getProperty(String key, String defaultValue) {
+ if (!sProps.containsKey(key)) {
+ return defaultValue;
+ }
+ return sProps.get(key);
+ }
+
/**
* This method will be used to add a new property
*
@@ -172,6 +179,8 @@ public class SessionParams implements Serializable {
if (!isValid) {
throw new InvalidConfigurationException("Invalid CARBON_INPUT_SEGMENT_IDs");
}
+ } else if (key.startsWith(CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS)) {
+ isValid = true;
} else {
throw new InvalidConfigurationException(
"The key " + key + " not supported for dynamic configuration.");
@@ -180,6 +189,10 @@ public class SessionParams implements Serializable {
return isValid;
}
+ public void removeProperty(String property) {
+ sProps.remove(property);
+ }
+
/**
* clear the set properties
*/
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cc0e6f1e/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
index 8d9bddc..e19ab24 100644
--- a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
+++ b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
@@ -49,7 +49,7 @@ public class CarbonIndexFileMergeWriter {
return file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT);
}
});
- if (indexFiles.length > 0) {
+ if (indexFiles != null && indexFiles.length > 0) {
SegmentIndexFileStore fileStore = new SegmentIndexFileStore();
fileStore.readAllIIndexOfSegment(segmentPath);
openThriftWriter(segmentPath + "/" +
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cc0e6f1e/core/src/main/java/org/apache/carbondata/events/OperationListenerBus.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/events/OperationListenerBus.java b/core/src/main/java/org/apache/carbondata/events/OperationListenerBus.java
index 01ecb04..321ddd5 100644
--- a/core/src/main/java/org/apache/carbondata/events/OperationListenerBus.java
+++ b/core/src/main/java/org/apache/carbondata/events/OperationListenerBus.java
@@ -53,7 +53,7 @@ public class OperationListenerBus {
* @param eventClass
* @param operationEventListener
*/
- public void addListener(Class<? extends Event> eventClass,
+ public OperationListenerBus addListener(Class<? extends Event> eventClass,
OperationEventListener operationEventListener) {
String eventType = eventClass.getName();
@@ -63,6 +63,7 @@ public class OperationListenerBus {
eventMap.put(eventType, operationEventListeners);
}
operationEventListeners.add(operationEventListener);
+ return INSTANCE;
}
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cc0e6f1e/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 57359fc..92ef6da 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -263,6 +263,14 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
configuration.set(CarbonTableInputFormat.VALIDATE_INPUT_SEGMENT_IDs, validate.toString());
}
+ public static void setAggeragateTableSegments(Configuration configuration, String segments) {
+ configuration.set(CarbonCommonConstants.CARBON_INPUT_SEGMENTS, segments);
+ }
+
+ private static String getAggeragateTableSegments(Configuration configuration) {
+ return configuration.get(CarbonCommonConstants.CARBON_INPUT_SEGMENTS);
+ }
+
/**
* get list of segment to access
*/
@@ -300,6 +308,11 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
@Override public List<InputSplit> getSplits(JobContext job) throws IOException {
AbsoluteTableIdentifier identifier = getAbsoluteTableIdentifier(job.getConfiguration());
SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(identifier);
+ CarbonTable carbonTable = getOrCreateCarbonTable(job.getConfiguration());
+ if (null == carbonTable) {
+ throw new IOException("Missing/Corrupt schema file for table.");
+ }
+ String aggregateTableSegments = getAggeragateTableSegments(job.getConfiguration());
TableDataMap blockletMap =
DataMapStoreManager.getInstance().getDataMap(identifier, BlockletDataMap.NAME,
BlockletDataMapFactory.class.getName());
@@ -352,6 +365,8 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
if (invalidSegments.size() > 0) {
blockletMap.clear(invalidSegments);
}
+ } else {
+ filteredSegmentToAccess = Arrays.asList(aggregateTableSegments.split(","));
}
// Clean the updated segments from memory if the update happens on segments
@@ -376,12 +391,8 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
// process and resolve the expression
Expression filter = getFilterPredicates(job.getConfiguration());
- CarbonTable carbonTable = getOrCreateCarbonTable(job.getConfiguration());
TableProvider tableProvider = new SingleTableProvider(carbonTable);
// this will be null in case of corrupt schema file.
- if (null == carbonTable) {
- throw new IOException("Missing/Corrupt schema file for table.");
- }
PartitionInfo partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName());
CarbonInputFormatUtil.processFilterExpression(filter, carbonTable, null, null);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cc0e6f1e/integration/spark-common-test/src/test/resources/sample.csv
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/sample.csv b/integration/spark-common-test/src/test/resources/sample.csv
index 7c57de7..06985e8 100644
--- a/integration/spark-common-test/src/test/resources/sample.csv
+++ b/integration/spark-common-test/src/test/resources/sample.csv
@@ -3,3 +3,5 @@ id,name,city,age
2,eason,shenzhen,27
3,jarry,wuhan,35
3,jarry,Bangalore,35
+4,kunal,Delhi,26
+4,vishal,Bangalore,29
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cc0e6f1e/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
index b90a5ea..23d1292 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
@@ -65,7 +65,7 @@ class TestLoadDataGeneral extends QueryTest with BeforeAndAfterAll {
sql(s"LOAD DATA LOCAL INPATH '$testData' into table loadtest")
checkAnswer(
sql("SELECT COUNT(*) FROM loadtest"),
- Seq(Row(4))
+ Seq(Row(6))
)
}
@@ -74,7 +74,7 @@ class TestLoadDataGeneral extends QueryTest with BeforeAndAfterAll {
sql(s"LOAD DATA LOCAL INPATH '$testData' into table loadtest")
checkAnswer(
sql("SELECT COUNT(*) FROM loadtest"),
- Seq(Row(8))
+ Seq(Row(10))
)
}
@@ -83,7 +83,7 @@ class TestLoadDataGeneral extends QueryTest with BeforeAndAfterAll {
sql(s"LOAD DATA LOCAL INPATH '$testData' into table loadtest")
checkAnswer(
sql("SELECT COUNT(*) FROM loadtest"),
- Seq(Row(12))
+ Seq(Row(14))
)
}
@@ -92,7 +92,7 @@ class TestLoadDataGeneral extends QueryTest with BeforeAndAfterAll {
sql(s"LOAD DATA LOCAL INPATH '$testData' into table loadtest")
checkAnswer(
sql("SELECT COUNT(*) FROM loadtest"),
- Seq(Row(16))
+ Seq(Row(18))
)
}
@@ -101,7 +101,7 @@ class TestLoadDataGeneral extends QueryTest with BeforeAndAfterAll {
sql(s"LOAD DATA LOCAL INPATH '$testData' into table loadtest options ('delimiter'='\\017')")
checkAnswer(
sql("SELECT COUNT(*) FROM loadtest"),
- Seq(Row(20))
+ Seq(Row(22))
)
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cc0e6f1e/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithAutoLoadMerge.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithAutoLoadMerge.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithAutoLoadMerge.scala
index 5211e9e..51e84d4 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithAutoLoadMerge.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithAutoLoadMerge.scala
@@ -44,7 +44,7 @@ class TestLoadDataWithAutoLoadMerge extends QueryTest with BeforeAndAfterAll {
sql(s"LOAD DATA LOCAL INPATH '$testData' into table automerge")
checkAnswer(
sql("SELECT COUNT(*) FROM automerge"),
- Seq(Row(4))
+ Seq(Row(6))
)
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cc0e6f1e/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithYarnLocalDirs.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithYarnLocalDirs.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithYarnLocalDirs.scala
index e92a7fd..ff415ae 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithYarnLocalDirs.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithYarnLocalDirs.scala
@@ -87,7 +87,7 @@ class TestLoadDataWithYarnLocalDirs extends QueryTest with BeforeAndAfterAll {
disableMultipleDir
checkAnswer(sql("select id from carbontable_yarnLocalDirs"),
- Seq(Row(1), Row(2), Row(3), Row(3)))
+ Seq(Row(1), Row(2), Row(3), Row(3), Row(4), Row(4)))
cleanUpYarnLocalDir
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cc0e6f1e/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala
new file mode 100644
index 0000000..0c65577
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.integration.spark.testsuite.preaggregate
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+class TestPreAggregateLoad extends QueryTest with BeforeAndAfterAll {
+
+ val testData = s"$resourcesPath/sample.csv"
+
+ override def beforeAll(): Unit = {
+ sql("DROP TABLE IF EXISTS maintable")
+ }
+
+ private def createAllAggregateTables(parentTableName: String): Unit = {
+ sql(
+ s"""create table ${ parentTableName }_preagg_sum stored BY 'carbondata' tblproperties
+ |('parent'='$parentTableName') as select id,sum(age) from $parentTableName group by id"""
+ .stripMargin)
+ sql(
+ s"""create table ${ parentTableName }_preagg_avg stored BY 'carbondata' tblproperties
+ |('parent'='$parentTableName') as select id,avg(age) from $parentTableName group by id"""
+ .stripMargin)
+ sql(
+ s"""create table ${ parentTableName }_preagg_count stored BY 'carbondata' tblproperties
+ |('parent'='$parentTableName') as select id,count(age) from $parentTableName group by id"""
+ .stripMargin)
+ sql(
+ s"""create table ${ parentTableName }_preagg_min stored BY 'carbondata' tblproperties
+ |('parent'='$parentTableName') as select id,min(age) from $parentTableName group by id"""
+ .stripMargin)
+ sql(
+ s"""create table ${ parentTableName }_preagg_max stored BY 'carbondata' tblproperties
+ |('parent'='$parentTableName') as select id,max(age) from $parentTableName group by id"""
+ .stripMargin)
+ }
+
+ test("test load into main table with pre-aggregate table") {
+ sql("DROP TABLE IF EXISTS maintable")
+ sql(
+ """
+ | CREATE TABLE maintable(id int, name string, city string, age int)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+ createAllAggregateTables("maintable")
+ sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+ checkAnswer(sql(s"select * from maintable_preagg_sum"),
+ Seq(Row(1, 31), Row(2, 27), Row(3, 70), Row(4, 55)))
+ checkAnswer(sql(s"select * from maintable_preagg_avg"),
+ Seq(Row(1, 31, 1), Row(2, 27, 1), Row(3, 70, 2), Row(4, 55, 2)))
+ checkAnswer(sql(s"select * from maintable_preagg_count"),
+ Seq(Row(1, 1), Row(2, 1), Row(3, 2), Row(4, 2)))
+ checkAnswer(sql(s"select * from maintable_preagg_min"),
+ Seq(Row(1, 31), Row(2, 27), Row(3, 35), Row(4, 26)))
+ checkAnswer(sql(s"select * from maintable_preagg_max"),
+ Seq(Row(1, 31), Row(2, 27), Row(3, 35), Row(4, 29)))
+ sql("drop table if exists maintable")
+ }
+
+ test("test load into main table with pre-aggregate table with dictionary_include") {
+ sql("drop table if exists maintable")
+ sql(
+ """
+ | CREATE TABLE maintable(id int, name string, city string, age int)
+ | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('dictionary_include'='id')
+ """.stripMargin)
+ createAllAggregateTables("maintable")
+ sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+ checkAnswer(sql(s"select * from maintable_preagg_sum"),
+ Seq(Row(1, 31), Row(2, 27), Row(3, 70), Row(4, 55)))
+ checkAnswer(sql(s"select * from maintable_preagg_avg"),
+ Seq(Row(1, 31, 1), Row(2, 27, 1), Row(3, 70, 2), Row(4, 55,2)))
+ checkAnswer(sql(s"select * from maintable_preagg_count"),
+ Seq(Row(1, 1), Row(2, 1), Row(3, 2), Row(4, 2)))
+ checkAnswer(sql(s"select * from maintable_preagg_min"),
+ Seq(Row(1, 31), Row(2, 27), Row(3, 35), Row(4, 26)))
+ checkAnswer(sql(s"select * from maintable_preagg_max"),
+ Seq(Row(1, 31), Row(2, 27), Row(3, 35), Row(4, 29)))
+ sql("drop table if exists maintable")
+ }
+
+ test("test load into main table with pre-aggregate table with single_pass") {
+ sql("drop table if exists maintable")
+ sql(
+ """
+ | CREATE TABLE maintable(id int, name string, city string, age int)
+ | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('dictionary_include'='id')
+ """.stripMargin)
+ createAllAggregateTables("maintable")
+ sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable options('single_pass'='true')")
+ checkAnswer(sql(s"select * from maintable_preagg_sum"),
+ Seq(Row(1, 31), Row(2, 27), Row(3, 70), Row(4, 55)))
+ checkAnswer(sql(s"select * from maintable_preagg_avg"),
+ Seq(Row(1, 31, 1), Row(2, 27, 1), Row(3, 70, 2), Row(4, 55,2)))
+ checkAnswer(sql(s"select * from maintable_preagg_count"),
+ Seq(Row(1, 1), Row(2, 1), Row(3, 2), Row(4, 2)))
+ checkAnswer(sql(s"select * from maintable_preagg_min"),
+ Seq(Row(1, 31), Row(2, 27), Row(3, 35), Row(4, 26)))
+ checkAnswer(sql(s"select * from maintable_preagg_max"),
+ Seq(Row(1, 31), Row(2, 27), Row(3, 35), Row(4, 29)))
+ sql("drop table if exists maintable")
+ }
+
+ test("test load into main table with incremental load") {
+ sql("drop table if exists maintable")
+ sql(
+ """
+ | CREATE TABLE maintable(id int, name string, city string, age int)
+ | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('dictionary_include'='id')
+ """.stripMargin)
+ createAllAggregateTables("maintable")
+ sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+ sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+ checkAnswer(sql(s"select * from maintable_preagg_sum"),
+ Seq(Row(1, 31),
+ Row(2, 27),
+ Row(3, 70),
+ Row(4, 55),
+ Row(1, 31),
+ Row(2, 27),
+ Row(3, 70),
+ Row(4, 55)))
+ checkAnswer(sql(s"select * from maintable_preagg_avg"),
+ Seq(Row(1, 31, 1),
+ Row(2, 27, 1),
+ Row(3, 70, 2),
+ Row(4, 55, 2),
+ Row(1, 31, 1),
+ Row(2, 27, 1),
+ Row(3, 70, 2),
+ Row(4, 55, 2)))
+ checkAnswer(sql(s"select * from maintable_preagg_count"),
+ Seq(Row(1, 1), Row(2, 1), Row(3, 2), Row(4, 2), Row(1, 1), Row(2, 1), Row(3, 2), Row(4, 2)))
+ checkAnswer(sql(s"select * from maintable_preagg_min"),
+ Seq(Row(1, 31),
+ Row(2, 27),
+ Row(3, 35),
+ Row(4, 26),
+ Row(1, 31),
+ Row(2, 27),
+ Row(3, 35),
+ Row(4, 26)))
+ checkAnswer(sql(s"select * from maintable_preagg_max"),
+ Seq(Row(1, 31),
+ Row(2, 27),
+ Row(3, 35),
+ Row(4, 29),
+ Row(1, 31),
+ Row(2, 27),
+ Row(3, 35),
+ Row(4, 29)))
+ sql("drop table if exists maintable")
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cc0e6f1e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/deleteTable/TestDeleteTableNewDDL.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/deleteTable/TestDeleteTableNewDDL.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/deleteTable/TestDeleteTableNewDDL.scala
index 485b94b..b82a0af 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/deleteTable/TestDeleteTableNewDDL.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/deleteTable/TestDeleteTableNewDDL.scala
@@ -182,6 +182,7 @@ class TestDeleteTableNewDDL extends QueryTest with BeforeAndAfterAll {
test("drop table and create table with dictionary exclude string scenario") {
try {
+
sql("create database test")
sql(
"CREATE table test.dropTableTest3 (ID int, date String, country String, name " +
@@ -242,6 +243,7 @@ class TestDeleteTableNewDDL extends QueryTest with BeforeAndAfterAll {
sql("drop table if exists dropTableTest4")
sql("drop table if exists table1")
sql("drop table if exists table2")
+ sql("drop database if exists test cascade")
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cc0e6f1e/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 5e46417..52a31a9 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -41,7 +41,7 @@ import org.apache.carbondata.core.scan.expression.Expression
import org.apache.carbondata.core.scan.model.QueryModel
import org.apache.carbondata.core.stats.{QueryStatistic, QueryStatisticsConstants, QueryStatisticsRecorder}
import org.apache.carbondata.core.statusmanager.FileFormat
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, DataTypeUtil, TaskMetricsMap}
+import org.apache.carbondata.core.util._
import org.apache.carbondata.hadoop._
import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
import org.apache.carbondata.hadoop.streaming.{CarbonStreamInputFormat, CarbonStreamRecordReader}
@@ -357,6 +357,17 @@ class CarbonScanRDD(
CarbonTableInputFormat
.setSegmentsToAccess(conf, segmentNumbersFromProperty.split(",").toList.asJava)
}
+ val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
+ if (carbonSessionInfo != null) {
+ CarbonTableInputFormat.setAggeragateTableSegments(conf, carbonSessionInfo.getSessionParams
+ .getProperty(CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
+ identifier.getCarbonTableIdentifier.getDatabaseName + "." +
+ identifier.getCarbonTableIdentifier.getTableName, ""))
+ CarbonTableInputFormat.setValidateSegmentsToAccess(conf, carbonSessionInfo.getSessionParams
+ .getProperty(CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS +
+ identifier.getCarbonTableIdentifier.getDatabaseName + "." +
+ identifier.getCarbonTableIdentifier.getTableName, "true").toBoolean)
+ }
format
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cc0e6f1e/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 1083669..9899be1 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -265,8 +265,7 @@ object CarbonDataRDDFactory {
result: Option[DictionaryServer],
overwriteTable: Boolean,
dataFrame: Option[DataFrame] = None,
- updateModel: Option[UpdateTableModel] = None
- ): Unit = {
+ updateModel: Option[UpdateTableModel] = None): Unit = {
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
val operationContext = new OperationContext
// for handling of the segment Merging.
@@ -350,7 +349,7 @@ object CarbonDataRDDFactory {
if (value == CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) {
loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
} else if (value == CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS &&
- !loadStatus.equals(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)) {
+ !loadStatus.equals(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)) {
loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS
}
}
@@ -464,7 +463,8 @@ object CarbonDataRDDFactory {
throw new Exception(status(0)._2._2.errorMsg)
}
// if segment is empty then fail the data load
- if (!CarbonLoaderUtil.isValidSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)) {
+ if (!carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isPreAggregateTable &&
+ !CarbonLoaderUtil.isValidSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)) {
// update the load entry in table status file for changing the status to failure
CommonUtil.updateTableStatusForFailure(carbonLoadModel)
LOGGER.info("********starting clean up**********")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cc0e6f1e/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index 18f76d1..2671aad 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -34,10 +34,11 @@ import org.apache.spark.sql.types._
import org.apache.carbondata.core.cache.{Cache, CacheProvider, CacheType}
import org.apache.carbondata.core.cache.dictionary.{Dictionary, DictionaryColumnUniqueIdentifier}
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, ColumnIdentifier}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier, ColumnIdentifier}
import org.apache.carbondata.core.metadata.datatype.{DataTypes => CarbonDataTypes}
import org.apache.carbondata.core.metadata.encoder.Encoding
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
+import org.apache.carbondata.core.scan.executor.util.QueryUtil
import org.apache.carbondata.core.util.DataTypeUtil
import org.apache.carbondata.core.util.path.CarbonStorePath
import org.apache.carbondata.spark.CarbonAliasDecoderRelation
@@ -271,11 +272,24 @@ case class CarbonDictionaryDecoder(
case (tableName, columnIdentifier, carbonDimension) =>
if (columnIdentifier != null) {
try {
+ val (newCarbonTableIdentifier, newColumnIdentifier) =
+ if (null != carbonDimension.getColumnSchema.getParentColumnTableRelations &&
+ !carbonDimension
+ .getColumnSchema.getParentColumnTableRelations.isEmpty) {
+ (QueryUtil.getTableIdentifierForColumn(carbonDimension),
+ new ColumnIdentifier(carbonDimension.getColumnSchema
+ .getParentColumnTableRelations.get(0).getColumnId,
+ carbonDimension.getColumnProperties,
+ carbonDimension.getDataType))
+ } else {
+ (atiMap(tableName).getCarbonTableIdentifier, columnIdentifier)
+ }
val dictionaryColumnUniqueIdentifier = new DictionaryColumnUniqueIdentifier(
- atiMap(tableName).getCarbonTableIdentifier,
- columnIdentifier, carbonDimension.getDataType,
- CarbonStorePath.getCarbonTablePath(atiMap(tableName)))
- allDictIdentifiers += dictionaryColumnUniqueIdentifier;
+ newCarbonTableIdentifier,
+ newColumnIdentifier, carbonDimension.getDataType,
+ CarbonStorePath
+ .getCarbonTablePath(atiMap(tableName).getStorePath, newCarbonTableIdentifier))
+ allDictIdentifiers += dictionaryColumnUniqueIdentifier
new ForwardDictionaryWrapper(
storePath,
dictionaryColumnUniqueIdentifier)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cc0e6f1e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
index 222c30d..b28ec10 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
@@ -152,6 +152,10 @@ case class LoadTableCommand(
LOGGER.audit(s"Cannot use single_pass=true for $dbName.$tableName during the first load")
carbonLoadModel.setUseOnePass(false)
}
+ // if table is an aggregate table then disable single pass.
+ if (carbonLoadModel.isAggLoadRequest) {
+ carbonLoadModel.setUseOnePass(false)
+ }
// Create table and metadata folders if not exist
val carbonTablePath = CarbonStorePath
.getCarbonTablePath(storePath, table.getCarbonTableIdentifier)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cc0e6f1e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
index e42e933..b952285 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
@@ -115,6 +115,11 @@ case class CreatePreAggregateTableCommand(
.buildChildSchema("", tableInfo.getDatabaseName, queryString, "AGGREGATION")
// upadting the parent table about child table
PreAggregateUtil.updateMainTable(parentDbName, parentTableName, childSchema, sparkSession)
+ val loadAvailable = PreAggregateUtil
+ .checkMainTableLoad(parentTable)
+ if (loadAvailable) {
+ sparkSession.sql(s"insert into ${ cm.databaseName }.${ cm.tableName } $queryString")
+ }
} catch {
case e: Exception =>
val identifier: TableIdentifier = TableIdentifier(tbName, Some(dbName))
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cc0e6f1e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/DropPreAggregateTablePostListener.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/DropPreAggregateTablePostListener.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/DropPreAggregateTablePostListener.scala
deleted file mode 100644
index 7127c46..0000000
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/DropPreAggregateTablePostListener.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.command.preaaggregate
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.sql.execution.command.CarbonDropTableCommand
-
-import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
-import org.apache.carbondata.events.{DropTablePostEvent, Event, OperationContext, OperationEventListener}
-
-class DropPreAggregateTablePostListener extends OperationEventListener {
-
- /**
- * Called on a specified event occurrence
- *
- * @param event
- */
- override def onEvent(event: Event, operationContext: OperationContext): Unit = {
- val dropPostEvent = event.asInstanceOf[DropTablePostEvent]
- val carbonTable = dropPostEvent.carbonTable
- val sparkSession = dropPostEvent.sparkSession
- if (carbonTable.isDefined && carbonTable.get.getTableInfo.getDataMapSchemaList != null &&
- !carbonTable.get.getTableInfo.getDataMapSchemaList.isEmpty) {
- val childSchemas = carbonTable.get.getTableInfo.getDataMapSchemaList
- for (childSchema: DataMapSchema <- childSchemas.asScala) {
- CarbonDropTableCommand(ifExistsSet = true,
- Some(childSchema.getRelationIdentifier.getDatabaseName),
- childSchema.getRelationIdentifier.getTableName).run(sparkSession)
- }
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cc0e6f1e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
new file mode 100644
index 0000000..b507856
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.preaaggregate
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.CarbonSession
+import org.apache.spark.sql.execution.command.CarbonDropTableCommand
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
+import org.apache.carbondata.events.{DropTablePostEvent, Event, LoadTablePostExecutionEvent, OperationContext, OperationEventListener}
+
+object DropPreAggregateTablePostListener extends OperationEventListener {
+
+ /**
+ * Called on a specified event occurrence
+ *
+ * @param event
+ */
+ override def onEvent(event: Event, operationContext: OperationContext): Unit = {
+ val dropPostEvent = event.asInstanceOf[DropTablePostEvent]
+ val carbonTable = dropPostEvent.carbonTable
+ val sparkSession = dropPostEvent.sparkSession
+ if (carbonTable.isDefined && carbonTable.get.getTableInfo.getDataMapSchemaList != null &&
+ !carbonTable.get.getTableInfo.getDataMapSchemaList.isEmpty) {
+ val childSchemas = carbonTable.get.getTableInfo.getDataMapSchemaList
+ for (childSchema: DataMapSchema <- childSchemas.asScala) {
+ CarbonDropTableCommand(ifExistsSet = true,
+ Some(childSchema.getRelationIdentifier.getDatabaseName),
+ childSchema.getRelationIdentifier.getTableName).run(sparkSession)
+ }
+ }
+
+ }
+}
+
+object LoadPostAggregateListener extends OperationEventListener {
+ /**
+ * Called on a specified event occurrence
+ *
+ * @param event
+ */
+ override def onEvent(event: Event, operationContext: OperationContext): Unit = {
+ val loadEvent = event.asInstanceOf[LoadTablePostExecutionEvent]
+ val sparkSession = loadEvent.sparkSession
+ val carbonLoadModel = loadEvent.carbonLoadModel
+ val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+ if (!table.getTableInfo.getDataMapSchemaList.isEmpty) {
+ for (dataMapSchema: DataMapSchema <- table.getTableInfo.getDataMapSchemaList.asScala) {
+ CarbonSession
+ .threadSet(CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
+ carbonLoadModel.getDatabaseName + "." +
+ carbonLoadModel.getTableName,
+ carbonLoadModel.getSegmentId)
+ CarbonSession.threadSet(CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS +
+ carbonLoadModel.getDatabaseName + "." +
+ carbonLoadModel.getTableName, "false")
+ val childTableName = dataMapSchema.getRelationIdentifier.getTableName
+ val childDatabaseName = dataMapSchema.getRelationIdentifier.getDatabaseName
+ val selectQuery = dataMapSchema.getProperties.get("CHILD_SELECT QUERY")
+ sparkSession.sql(s"insert into $childDatabaseName.$childTableName $selectQuery")
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cc0e6f1e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
index fd0e543..b35b525 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.command.{ColumnTableRelation, DataMapField, Field}
import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionState}
+import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.sql.hive.HiveExternalCatalog.{DATASOURCE_SCHEMA_NUMPARTS, DATASOURCE_SCHEMA_PART_PREFIX}
import org.apache.spark.sql.types.DataType
@@ -35,6 +35,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema}
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
import org.apache.carbondata.core.util.path.CarbonStorePath
import org.apache.carbondata.format.TableInfo
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
@@ -428,4 +429,24 @@ object PreAggregateUtil {
thriftTable, carbonTable.getAbsoluteTableIdentifier.getTablePath)(sparkSession)
}
}
+
+ def getChildCarbonTable(databaseName: String, tableName: String)
+ (sparkSession: SparkSession): Option[CarbonTable] = {
+ val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
+ metaStore.getTableFromMetadataCache(databaseName, tableName) match {
+ case Some(tableMeta) => Some(tableMeta.carbonTable)
+ case None => try {
+ Some(metaStore.lookupRelation(Some(databaseName), tableName)(sparkSession)
+ .asInstanceOf[CarbonRelation].metaData.carbonTable)
+ } catch {
+ case _: Exception =>
+ None
+ }
+ }
+ }
+
+ def checkMainTableLoad(carbonTable: CarbonTable): Boolean = {
+ SegmentStatusManager.readLoadMetadata(
+ carbonTable.getMetaDataFilepath).nonEmpty
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cc0e6f1e/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
index f61ab84..ba7e1eb 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
@@ -20,7 +20,8 @@ package org.apache.spark.sql.hive
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.CarbonTableIdentifierImplicit
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar}
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, NamedExpression}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Cast, ExprId, NamedExpression}
+import org.apache.spark.sql.catalyst.expressions.aggregate.{DeclarativeAggregate, _}
import org.apache.spark.sql.catalyst.plans.Inner
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
@@ -57,8 +58,16 @@ object CarbonPreInsertionCasts extends Rule[LogicalPlan] {
.DEFAULT_MAX_NUMBER_OF_COLUMNS
)
}
- if (child.output.size >= relation.carbonRelation.output.size) {
- val newChildOutput = child.output.zipWithIndex.map { columnWithIndex =>
+ val isAggregateTable = !relation.carbonRelation.tableMeta.carbonTable.getTableInfo
+ .getParentRelationIdentifiers.isEmpty
+ // transform logical plan if the load is for aggregate table.
+ val childPlan = if (isAggregateTable) {
+ transformAggregatePlan(child)
+ } else {
+ child
+ }
+ if (childPlan.output.size >= relation.carbonRelation.output.size) {
+ val newChildOutput = childPlan.output.zipWithIndex.map { columnWithIndex =>
columnWithIndex._1 match {
case attr: Alias =>
Alias(attr.child, s"col${ columnWithIndex._2 }")(attr.exprId)
@@ -67,16 +76,54 @@ object CarbonPreInsertionCasts extends Rule[LogicalPlan] {
case attr => attr
}
}
- val newChild: LogicalPlan = if (newChildOutput == child.output) {
+ val newChild: LogicalPlan = if (newChildOutput == childPlan.output) {
p.child
} else {
- Project(newChildOutput, child)
+ Project(newChildOutput, childPlan)
}
InsertIntoCarbonTable(relation, p.partition, newChild, p.overwrite, p.ifNotExists)
} else {
sys.error("Cannot insert into target table because column number are different")
}
}
+
+ /**
+ * Transform the logical plan with average(col1) aggregation type to sum(col1) and count(col1).
+ *
+ * @param logicalPlan
+ * @return
+ */
+ private def transformAggregatePlan(logicalPlan: LogicalPlan): LogicalPlan = {
+ logicalPlan transform {
+ case aggregate@Aggregate(_, aExp, _) =>
+ val newExpressions = aExp flatMap {
+ case alias@Alias(attrExpression: AggregateExpression, _) =>
+ attrExpression.aggregateFunction flatMap {
+ case Average(attr: AttributeReference) =>
+ Seq(Alias(attrExpression
+ .copy(aggregateFunction = Sum(attr.withName(attr.name)),
+ resultId = NamedExpression.newExprId),
+ attr.name)(),
+ Alias(attrExpression
+ .copy(aggregateFunction = Count(attr.withName(attr.name)),
+ resultId = NamedExpression.newExprId), attr.name)())
+ case Average(Cast(attr: AttributeReference, _)) =>
+ Seq(Alias(attrExpression
+ .copy(aggregateFunction = Sum(attr.withName(attr.name)),
+ resultId = NamedExpression.newExprId),
+ attr.name)(),
+ Alias(attrExpression
+ .copy(aggregateFunction = Count(attr.withName(attr.name)),
+ resultId = NamedExpression.newExprId), attr.name)())
+ case _: DeclarativeAggregate => Seq(alias)
+ case _ => Nil
+ }
+ case namedExpr: NamedExpression => Seq(namedExpr)
+ }
+ aggregate.copy(aggregateExpressions = newExpressions)
+ case plan: LogicalPlan => plan
+ }
+ }
}
case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[LogicalPlan] {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cc0e6f1e/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
index 97ea7f8..d17dd11 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.execution.SparkOptimizer
-import org.apache.spark.sql.execution.command.preaaggregate.DropPreAggregateTablePostListener
+import org.apache.spark.sql.execution.command.preaaggregate.{DropPreAggregateTablePostListener, LoadPostAggregateListener}
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy}
import org.apache.spark.sql.internal.SQLConf
@@ -35,7 +35,7 @@ import org.apache.spark.sql.parser.CarbonSparkSqlParser
import org.apache.carbondata.core.datamap.DataMapStoreManager
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.events.{DropTablePostEvent, Event, OperationListenerBus}
+import org.apache.carbondata.events.{DropTablePostEvent, LoadTablePostExecutionEvent, OperationListenerBus}
/**
* This class will have carbon catalog and refresh the relation from cache if the carbontable in
@@ -130,7 +130,8 @@ object CarbonSessionState {
def init(): Unit = {
OperationListenerBus.getInstance()
- .addListener(classOf[DropTablePostEvent], new DropPreAggregateTablePostListener)
+ .addListener(classOf[DropTablePostEvent], DropPreAggregateTablePostListener)
+ .addListener(classOf[LoadTablePostExecutionEvent], LoadPostAggregateListener)
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cc0e6f1e/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
index f2c8a0a..7d25efd 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql.execution.command._
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, SessionParams}
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
case class CarbonDropDatabaseCommand(command: DropDatabaseCommand)
@@ -83,6 +84,8 @@ object CarbonSetCommand {
"property should be in \" carbon.input.segments.<database_name>" +
".<table_name>=<seg_id list> \" format.")
}
+ } else if (key.startsWith(CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS)) {
+ sessionParams.addProperty(key.toLowerCase(), value)
}
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cc0e6f1e/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/DictionaryFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/DictionaryFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/DictionaryFieldConverterImpl.java
index 2671393..7045101 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/DictionaryFieldConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/DictionaryFieldConverterImpl.java
@@ -37,7 +37,6 @@ import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.DataTypeUtil;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
import org.apache.carbondata.processing.loading.DataField;
import org.apache.carbondata.processing.loading.converter.BadRecordLogHolder;
import org.apache.carbondata.processing.loading.dictionary.DictionaryServerClientDictionary;
@@ -68,15 +67,12 @@ public class DictionaryFieldConverterImpl extends AbstractDictionaryFieldConvert
Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
CarbonTableIdentifier carbonTableIdentifier, String nullFormat, int index,
DictionaryClient client, boolean useOnePass, String storePath,
- Map<Object, Integer> localCache, boolean isEmptyBadRecord) throws IOException {
+ Map<Object, Integer> localCache, boolean isEmptyBadRecord,
+ DictionaryColumnUniqueIdentifier identifier) throws IOException {
this.index = index;
this.carbonDimension = (CarbonDimension) dataField.getColumn();
this.nullFormat = nullFormat;
this.isEmptyBadRecord = isEmptyBadRecord;
- DictionaryColumnUniqueIdentifier identifier =
- new DictionaryColumnUniqueIdentifier(carbonTableIdentifier,
- dataField.getColumn().getColumnIdentifier(), dataField.getColumn().getDataType(),
- CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier));
// if use one pass, use DictionaryServerClientDictionary
if (useOnePass) {