You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/07/12 15:05:21 UTC
[45/50] [abbrv] carbondata git commit: [CARBONDATA-1271] Enhanced
Performance for Hive Integration with Carbondata
[CARBONDATA-1271] Enhanced Performance for Hive Integration with Carbondata
This closes #1142
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/cbe14197
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/cbe14197
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/cbe14197
Branch: refs/heads/datamap
Commit: cbe141976a53a558b84d6e31baf3ec54a9bc38cc
Parents: 285ce72
Author: Bhavya <bh...@knoldus.com>
Authored: Thu Jul 6 11:53:03 2017 +0530
Committer: chenliang613 <ch...@apache.org>
Committed: Wed Jul 12 17:40:11 2017 +0800
----------------------------------------------------------------------
.../core/stats/QueryStatisticsRecorderImpl.java | 80 +++---
.../carbondata/hadoop/CarbonInputFormat.java | 7 +-
.../carbondata/hive/CarbonArrayInspector.java | 4 -
.../hive/CarbonDictionaryDecodeReadSupport.java | 288 +++++++++++++++++++
.../carbondata/hive/CarbonHiveInputSplit.java | 23 +-
.../carbondata/hive/CarbonHiveRecordReader.java | 67 ++---
.../apache/carbondata/hive/CarbonHiveSerDe.java | 36 +--
.../hive/MapredCarbonInputFormat.java | 129 ++++++---
.../hive/server/HiveEmbeddedServer2.java | 1 +
9 files changed, 477 insertions(+), 158 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cbe14197/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorderImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorderImpl.java b/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorderImpl.java
index f84a674..ffb7d7f 100644
--- a/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorderImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorderImpl.java
@@ -101,45 +101,47 @@ public class QueryStatisticsRecorderImpl implements QueryStatisticsRecorder, Ser
long scannedPages = 0;
try {
for (QueryStatistic statistic : queryStatistics) {
- switch (statistic.getMessage()) {
- case QueryStatisticsConstants.LOAD_BLOCKS_EXECUTOR:
- load_blocks_time += statistic.getTimeTaken();
- break;
- case QueryStatisticsConstants.SCAN_BLOCKlET_TIME:
- scan_blocks_time += statistic.getCount();
- break;
- case QueryStatisticsConstants.SCAN_BLOCKS_NUM:
- scan_blocks_num += statistic.getCount();
- break;
- case QueryStatisticsConstants.LOAD_DICTIONARY:
- load_dictionary_time += statistic.getTimeTaken();
- break;
- case QueryStatisticsConstants.RESULT_SIZE:
- result_size += statistic.getCount();
- break;
- case QueryStatisticsConstants.EXECUTOR_PART:
- total_executor_time += statistic.getTimeTaken();
- break;
- case QueryStatisticsConstants.TOTAL_BLOCKLET_NUM:
- total_blocklet = statistic.getCount();
- break;
- case QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM:
- valid_scan_blocklet = statistic.getCount();
- break;
- case QueryStatisticsConstants.VALID_PAGE_SCANNED:
- valid_pages_blocklet = statistic.getCount();
- break;
- case QueryStatisticsConstants.TOTAL_PAGE_SCANNED:
- total_pages = statistic.getCount();
- break;
- case QueryStatisticsConstants.READ_BLOCKlET_TIME:
- readTime = statistic.getCount();
- break;
- case QueryStatisticsConstants.PAGE_SCANNED:
- scannedPages = statistic.getCount();
- break;
- default:
- break;
+ if (statistic.getMessage() != null) {
+ switch (statistic.getMessage()) {
+ case QueryStatisticsConstants.LOAD_BLOCKS_EXECUTOR:
+ load_blocks_time += statistic.getTimeTaken();
+ break;
+ case QueryStatisticsConstants.SCAN_BLOCKlET_TIME:
+ scan_blocks_time += statistic.getCount();
+ break;
+ case QueryStatisticsConstants.SCAN_BLOCKS_NUM:
+ scan_blocks_num += statistic.getCount();
+ break;
+ case QueryStatisticsConstants.LOAD_DICTIONARY:
+ load_dictionary_time += statistic.getTimeTaken();
+ break;
+ case QueryStatisticsConstants.RESULT_SIZE:
+ result_size += statistic.getCount();
+ break;
+ case QueryStatisticsConstants.EXECUTOR_PART:
+ total_executor_time += statistic.getTimeTaken();
+ break;
+ case QueryStatisticsConstants.TOTAL_BLOCKLET_NUM:
+ total_blocklet = statistic.getCount();
+ break;
+ case QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM:
+ valid_scan_blocklet = statistic.getCount();
+ break;
+ case QueryStatisticsConstants.VALID_PAGE_SCANNED:
+ valid_pages_blocklet = statistic.getCount();
+ break;
+ case QueryStatisticsConstants.TOTAL_PAGE_SCANNED:
+ total_pages = statistic.getCount();
+ break;
+ case QueryStatisticsConstants.READ_BLOCKlET_TIME:
+ readTime = statistic.getCount();
+ break;
+ case QueryStatisticsConstants.PAGE_SCANNED:
+ scannedPages = statistic.getCount();
+ break;
+ default:
+ break;
+ }
}
}
String headers =
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cbe14197/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
index 1e69648..16b5d69 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
@@ -444,9 +444,14 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
}
}
}
+
+ // For Hive integration if we have to get the stats we have to fetch hive.query.id
+ String query_id = job.getConfiguration().get("query.id") != null ?
+ job.getConfiguration().get("query.id") :
+ job.getConfiguration().get("hive.query.id");
statistic
.addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_DRIVER, System.currentTimeMillis());
- recorder.recordStatisticsForDriver(statistic, job.getConfiguration().get("query.id"));
+ recorder.recordStatisticsForDriver(statistic, query_id);
return resultFilterredBlocks;
} finally {
// clean up the access count for a segment as soon as its usage is complete so that in
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cbe14197/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonArrayInspector.java
----------------------------------------------------------------------
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonArrayInspector.java b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonArrayInspector.java
index 49e068a..b26c959 100644
--- a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonArrayInspector.java
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonArrayInspector.java
@@ -18,7 +18,6 @@ package org.apache.carbondata.hive;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.List;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -122,9 +121,6 @@ class CarbonArrayInspector implements SettableListObjectInspector {
final Writable[] array = ((ArrayWritable) subObj).get();
final List<Writable> list = Arrays.asList(array);
-
- Collections.addAll(list, array);
-
return list;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cbe14197/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonDictionaryDecodeReadSupport.java
----------------------------------------------------------------------
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonDictionaryDecodeReadSupport.java b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonDictionaryDecodeReadSupport.java
new file mode 100644
index 0000000..bc66d49
--- /dev/null
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonDictionaryDecodeReadSupport.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.hive;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.core.cache.Cache;
+import org.apache.carbondata.core.cache.CacheProvider;
+import org.apache.carbondata.core.cache.CacheType;
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
+
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.hive.serde2.io.ShortWritable;
+import org.apache.hadoop.hive.serde2.io.TimestampWritable;
+
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.catalyst.util.GenericArrayData;
+
+/**
+ * This is the class to decode dictionary encoded column data back to its original value.
+ */
+public class CarbonDictionaryDecodeReadSupport<T> implements CarbonReadSupport<T> {
+
+ protected Dictionary[] dictionaries;
+
+ protected DataType[] dataTypes;
+ /**
+ * carbon columns
+ */
+ protected CarbonColumn[] carbonColumns;
+
+ protected Writable[] writableArr;
+
+ /**
+ * This initialization is done inside executor task
+ * for column dictionary involved in decoding.
+ *
+ * @param carbonColumns column list
+ * @param absoluteTableIdentifier table identifier
+ */
+ @Override public void initialize(CarbonColumn[] carbonColumns,
+ AbsoluteTableIdentifier absoluteTableIdentifier) throws IOException {
+ this.carbonColumns = carbonColumns;
+ dictionaries = new Dictionary[carbonColumns.length];
+ dataTypes = new DataType[carbonColumns.length];
+ for (int i = 0; i < carbonColumns.length; i++) {
+ if (carbonColumns[i].hasEncoding(Encoding.DICTIONARY) && !carbonColumns[i]
+ .hasEncoding(Encoding.DIRECT_DICTIONARY) && !carbonColumns[i].isComplex()) {
+ CacheProvider cacheProvider = CacheProvider.getInstance();
+ Cache<DictionaryColumnUniqueIdentifier, Dictionary> forwardDictionaryCache = cacheProvider
+ .createCache(CacheType.FORWARD_DICTIONARY, absoluteTableIdentifier.getStorePath());
+ dataTypes[i] = carbonColumns[i].getDataType();
+ dictionaries[i] = forwardDictionaryCache.get(
+ new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier.getCarbonTableIdentifier(),
+ carbonColumns[i].getColumnIdentifier(), dataTypes[i]));
+ } else {
+ dataTypes[i] = carbonColumns[i].getDataType();
+ }
+ }
+ }
+
+ @Override public T readRow(Object[] data) {
+ assert (data.length == dictionaries.length);
+ writableArr = new Writable[data.length];
+ for (int i = 0; i < dictionaries.length; i++) {
+ if (dictionaries[i] != null) {
+ data[i] = dictionaries[i].getDictionaryValueForKey((int) data[i]);
+ }
+ try {
+ writableArr[i] = createWritableObject(data[i], carbonColumns[i]);
+ } catch (IOException e) {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ }
+
+ return (T) writableArr;
+ }
+
+ /**
+ * to book keep the dictionary cache or update access count for each
+ * column involved during decode, to facilitate LRU cache policy if memory
+ * threshold is reached
+ */
+ @Override public void close() {
+ if (dictionaries == null) {
+ return;
+ }
+ for (int i = 0; i < dictionaries.length; i++) {
+ CarbonUtil.clearDictionaryCache(dictionaries[i]);
+ }
+ }
+
+ /**
+ * To Create the Writable from the CarbonData data
+ *
+ * @param obj
+ * @param carbonColumn
+ * @return
+ * @throws IOException
+ */
+ private Writable createWritableObject(Object obj, CarbonColumn carbonColumn) throws IOException {
+ DataType dataType = carbonColumn.getDataType();
+ switch (dataType) {
+ case STRUCT:
+ return createStruct(obj, carbonColumn);
+ case ARRAY:
+ return createArray(obj, carbonColumn);
+ default:
+ return createWritablePrimitive(obj, carbonColumn);
+ }
+ }
+
+ /**
+ * Create Array Data for Array Datatype
+ *
+ * @param obj
+ * @param carbonColumn
+ * @return
+ * @throws IOException
+ */
+ private ArrayWritable createArray(Object obj, CarbonColumn carbonColumn) throws IOException {
+ if (obj instanceof GenericArrayData) {
+ Object[] objArray = ((GenericArrayData) obj).array();
+ List<CarbonDimension> childCarbonDimensions = null;
+ CarbonDimension arrayDimension = null;
+ if (carbonColumn.isDimension() && carbonColumn.getColumnSchema().getNumberOfChild() > 0) {
+ childCarbonDimensions = ((CarbonDimension) carbonColumn).getListOfChildDimensions();
+ arrayDimension = childCarbonDimensions.get(0);
+ }
+ List array = new ArrayList();
+ if (objArray != null) {
+ for (int i = 0; i < objArray.length; i++) {
+ Object curObj = objArray[i];
+ Writable newObj = createWritableObject(curObj, arrayDimension);
+ array.add(newObj);
+ }
+ }
+ if (array.size() > 0) {
+ ArrayWritable subArray = new ArrayWritable(Writable.class,
+ (Writable[]) array.toArray(new Writable[array.size()]));
+ return new ArrayWritable(Writable.class, new Writable[] { subArray });
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Create the Struct data for the Struct Datatype
+ *
+ * @param obj
+ * @param carbonColumn
+ * @return
+ * @throws IOException
+ */
+ private ArrayWritable createStruct(Object obj, CarbonColumn carbonColumn) throws IOException {
+ if (obj instanceof GenericInternalRow) {
+ Object[] objArray = ((GenericInternalRow) obj).values();
+ List<CarbonDimension> childCarbonDimensions = null;
+ if (carbonColumn.isDimension() && carbonColumn.getColumnSchema().getNumberOfChild() > 0) {
+ childCarbonDimensions = ((CarbonDimension) carbonColumn).getListOfChildDimensions();
+ }
+ Writable[] arr = new Writable[objArray.length];
+ for (int i = 0; i < objArray.length; i++) {
+
+ arr[i] = createWritableObject(objArray[i], childCarbonDimensions.get(i));
+ }
+ return new ArrayWritable(Writable.class, arr);
+ }
+ throw new IOException("DataType not supported in Carbondata");
+ }
+
+ /**
+ * This method will create the Writable Objects for primitives.
+ *
+ * @param obj
+ * @param carbonColumn
+ * @return
+ * @throws IOException
+ */
+ private Writable createWritablePrimitive(Object obj, CarbonColumn carbonColumn)
+ throws IOException {
+ DataType dataType = carbonColumn.getDataType();
+ if (obj == null) {
+ return null;
+ }
+ switch (dataType) {
+ case NULL:
+ return null;
+ case DOUBLE:
+ return new DoubleWritable((double) obj);
+ case INT:
+ return new IntWritable((int) obj);
+ case LONG:
+ return new LongWritable((long) obj);
+ case SHORT:
+ return new ShortWritable((Short) obj);
+ case DATE:
+ return new DateWritable(new Date((Integer) obj));
+ case TIMESTAMP:
+ return new TimestampWritable(new Timestamp((long) obj));
+ case STRING:
+ return new Text(obj.toString());
+ case DECIMAL:
+ return new HiveDecimalWritable(
+ HiveDecimal.create(new java.math.BigDecimal(obj.toString())));
+ }
+ throw new IOException("Unknown primitive : " + dataType.getName());
+ }
+
+ /**
+ * If we need to use the same Writable[] then we can use this method
+ *
+ * @param writable
+ * @param obj
+ * @param carbonColumn
+ * @throws IOException
+ */
+ private void setPrimitive(Writable writable, Object obj, CarbonColumn carbonColumn)
+ throws IOException {
+ DataType dataType = carbonColumn.getDataType();
+ if (obj == null) {
+ writable.write(null);
+ }
+ switch (dataType) {
+ case DOUBLE:
+ ((DoubleWritable) writable).set((double) obj);
+ break;
+ case INT:
+ ((IntWritable) writable).set((int) obj);
+ break;
+ case LONG:
+ ((LongWritable) writable).set((long) obj);
+ break;
+ case SHORT:
+ ((ShortWritable) writable).set((short) obj);
+ break;
+ case DATE:
+ ((DateWritable) writable).set(new Date((Long) obj));
+ break;
+ case TIMESTAMP:
+ ((TimestampWritable) writable).set(new Timestamp((long) obj));
+ break;
+ case STRING:
+ ((Text) writable).set(obj.toString());
+ break;
+ case DECIMAL:
+ ((HiveDecimalWritable) writable)
+ .set(HiveDecimal.create(new java.math.BigDecimal(obj.toString())));
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cbe14197/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveInputSplit.java
----------------------------------------------------------------------
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveInputSplit.java b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveInputSplit.java
index bfe4d27..b922295 100644
--- a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveInputSplit.java
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveInputSplit.java
@@ -113,8 +113,7 @@ public class CarbonHiveInputSplit extends FileSplit
}
public static CarbonHiveInputSplit from(String segmentId, FileSplit split,
- ColumnarFormatVersion version)
- throws IOException {
+ ColumnarFormatVersion version) throws IOException {
return new CarbonHiveInputSplit(segmentId, split.getPath(), split.getStart(), split.getLength(),
split.getLocations(), version);
}
@@ -151,8 +150,7 @@ public class CarbonHiveInputSplit extends FileSplit
return segmentId;
}
- @Override
- public void readFields(DataInput in) throws IOException {
+ @Override public void readFields(DataInput in) throws IOException {
super.readFields(in);
this.segmentId = in.readUTF();
this.version = ColumnarFormatVersion.valueOf(in.readShort());
@@ -162,10 +160,10 @@ public class CarbonHiveInputSplit extends FileSplit
for (int i = 0; i < numInvalidSegment; i++) {
invalidSegments.add(in.readUTF());
}
+ this.numberOfBlocklets = in.readInt();
}
- @Override
- public void write(DataOutput out) throws IOException {
+ @Override public void write(DataOutput out) throws IOException {
super.write(out);
out.writeUTF(segmentId);
out.writeShort(version.number());
@@ -174,6 +172,7 @@ public class CarbonHiveInputSplit extends FileSplit
for (String invalidSegment : invalidSegments) {
out.writeUTF(invalidSegment);
}
+ out.writeInt(numberOfBlocklets);
}
public List<String> getInvalidSegments() {
@@ -213,8 +212,7 @@ public class CarbonHiveInputSplit extends FileSplit
return bucketId;
}
- @Override
- public int compareTo(Distributable o) {
+ @Override public int compareTo(Distributable o) {
if (o == null) {
return -1;
}
@@ -264,18 +262,15 @@ public class CarbonHiveInputSplit extends FileSplit
return 0;
}
- @Override
- public String getBlockPath() {
+ @Override public String getBlockPath() {
return getPath().getName();
}
- @Override
- public List<Long> getMatchedBlocklets() {
+ @Override public List<Long> getMatchedBlocklets() {
return null;
}
- @Override
- public boolean fullScan() {
+ @Override public boolean fullScan() {
return true;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cbe14197/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java
index e4df02e..2a92185 100644
--- a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java
@@ -62,6 +62,8 @@ class CarbonHiveRecordReader extends CarbonRecordReader<ArrayWritable>
private ArrayWritable valueObj = null;
private CarbonObjectInspector objInspector;
+ private long recordReaderCounter = 0;
+ private int[] columnIds;
public CarbonHiveRecordReader(QueryModel queryModel, CarbonReadSupport<ArrayWritable> readSupport,
InputSplit inputSplit, JobConf jobConf) throws IOException {
@@ -88,17 +90,12 @@ class CarbonHiveRecordReader extends CarbonRecordReader<ArrayWritable>
} catch (QueryExecutionException e) {
throw new IOException(e.getMessage(), e.getCause());
}
- if (valueObj == null) {
- valueObj =
- new ArrayWritable(Writable.class, new Writable[queryModel.getProjectionColumns().length]);
- }
-
final TypeInfo rowTypeInfo;
final List<String> columnNames;
List<TypeInfo> columnTypes;
// Get column names and sort order
final String colIds = conf.get("hive.io.file.readcolumn.ids");
- final String columnNameProperty = conf.get("hive.io.file.readcolumn.names");
+ final String columnNameProperty = conf.get(serdeConstants.LIST_COLUMNS);
final String columnTypeProperty = conf.get(serdeConstants.LIST_COLUMN_TYPES);
if (columnNameProperty.length() == 0) {
@@ -111,47 +108,39 @@ class CarbonHiveRecordReader extends CarbonRecordReader<ArrayWritable>
} else {
columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
}
+
+ if (valueObj == null) {
+ valueObj = new ArrayWritable(Writable.class, new Writable[columnTypes.size()]);
+ }
+
if (!colIds.equals("")) {
String[] arraySelectedColId = colIds.split(",");
List<TypeInfo> reqColTypes = new ArrayList<TypeInfo>();
-
- for (String anArrayColId : arraySelectedColId) {
- reqColTypes.add(columnTypes.get(Integer.parseInt(anArrayColId)));
+ columnIds = new int[arraySelectedColId.length];
+ int columnId = 0;
+ for (int j = 0; j < arraySelectedColId.length; j++) {
+ columnId = Integer.parseInt(arraySelectedColId[j]);
+ columnIds[j] = columnId;
}
- // Create row related objects
- rowTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNames, reqColTypes);
- this.objInspector = new CarbonObjectInspector((StructTypeInfo) rowTypeInfo);
- } else {
- rowTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes);
- this.objInspector = new CarbonObjectInspector((StructTypeInfo) rowTypeInfo);
}
+
+ rowTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes);
+ this.objInspector = new CarbonObjectInspector((StructTypeInfo) rowTypeInfo);
}
@Override public boolean next(Void aVoid, ArrayWritable value) throws IOException {
if (carbonIterator.hasNext()) {
Object obj = readSupport.readRow(carbonIterator.next());
- ArrayWritable tmpValue;
- try {
- tmpValue = createArrayWritable(obj);
- } catch (SerDeException se) {
- throw new IOException(se.getMessage(), se.getCause());
- }
-
- if (value != tmpValue) {
- final Writable[] arrValue = value.get();
- final Writable[] arrCurrent = tmpValue.get();
- if (valueObj != null && arrValue.length == arrCurrent.length) {
- System.arraycopy(arrCurrent, 0, arrValue, 0, arrCurrent.length);
- } else {
- if (arrValue.length != arrCurrent.length) {
- throw new IOException(
- "CarbonHiveInput : size of object differs. Value" + " size : " + arrValue.length
- + ", Current Object size : " + arrCurrent.length);
- } else {
- throw new IOException("CarbonHiveInput can not support RecordReaders that"
- + " don't return same key & value & value is null");
- }
+ recordReaderCounter++;
+ Writable[] objArray = (Writable[]) obj;
+ Writable[] sysArray = new Writable[value.get().length];
+ if (columnIds != null && columnIds.length > 0 && objArray.length == columnIds.length) {
+ for (int i = 0; i < columnIds.length; i++) {
+ sysArray[columnIds[i]] = objArray[i];
}
+ value.set(sysArray);
+ } else {
+ value.set(objArray);
}
return true;
} else {
@@ -159,10 +148,6 @@ class CarbonHiveRecordReader extends CarbonRecordReader<ArrayWritable>
}
}
- private ArrayWritable createArrayWritable(Object obj) throws SerDeException {
- return createStruct(obj, objInspector);
- }
-
@Override public Void createKey() {
return null;
}
@@ -172,7 +157,7 @@ class CarbonHiveRecordReader extends CarbonRecordReader<ArrayWritable>
}
@Override public long getPos() throws IOException {
- return 0;
+ return recordReaderCounter;
}
@Override public float getProgress() throws IOException {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cbe14197/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java
----------------------------------------------------------------------
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java
index f66f3ed..2980ad3 100644
--- a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java
@@ -79,11 +79,9 @@ class CarbonHiveSerDe extends AbstractSerDe {
final TypeInfo rowTypeInfo;
final List<String> columnNames;
- final List<String> reqColNames;
final List<TypeInfo> columnTypes;
// Get column names and sort order
assert configuration != null;
- final String colIds = configuration.get("hive.io.file.readcolumn.ids");
final String columnNameProperty = tbl.getProperty(serdeConstants.LIST_COLUMNS);
final String columnTypeProperty = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES);
@@ -98,29 +96,17 @@ class CarbonHiveSerDe extends AbstractSerDe {
} else {
columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
}
- if (colIds != null && !colIds.equals("")) {
- reqColNames = new ArrayList<String>();
-
- String[] arraySelectedColId = colIds.split(",");
- List<TypeInfo> reqColTypes = new ArrayList<TypeInfo>();
- for (String anArrayColId : arraySelectedColId) {
- reqColNames.add(columnNames.get(Integer.parseInt(anArrayColId)));
- reqColTypes.add(columnTypes.get(Integer.parseInt(anArrayColId)));
- }
- // Create row related objects
- rowTypeInfo = TypeInfoFactory.getStructTypeInfo(reqColNames, reqColTypes);
- this.objInspector = new CarbonObjectInspector((StructTypeInfo) rowTypeInfo);
- }
- else {
- // Create row related objects
- rowTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes);
- this.objInspector = new CarbonObjectInspector((StructTypeInfo) rowTypeInfo);
-
- // Stats part
- serializedSize = 0;
- deserializedSize = 0;
- status = LAST_OPERATION.UNKNOWN;
- }
+
+
+
+ // Create row related objects
+ rowTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes);
+ this.objInspector = new CarbonObjectInspector((StructTypeInfo) rowTypeInfo);
+
+ // Stats part
+ serializedSize = 0;
+ deserializedSize = 0;
+ status = LAST_OPERATION.UNKNOWN;
}
@Override public Class<? extends Writable> getSerializedClass() {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cbe14197/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
index 7a1c9db..58f25c9 100644
--- a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
@@ -17,12 +17,12 @@
package org.apache.carbondata.hive;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
import org.apache.carbondata.core.scan.expression.Expression;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
import org.apache.carbondata.core.scan.model.CarbonQueryPlan;
@@ -31,8 +31,11 @@ import org.apache.carbondata.hadoop.CarbonInputFormat;
import org.apache.carbondata.hadoop.CarbonInputSplit;
import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
+import org.apache.carbondata.hadoop.util.ObjectSerializationUtil;
+import org.apache.carbondata.hadoop.util.SchemaReader;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.InvalidPathException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
import org.apache.hadoop.io.ArrayWritable;
@@ -42,9 +45,11 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.StringUtils;
public class MapredCarbonInputFormat extends CarbonInputFormat<ArrayWritable>
implements InputFormat<Void, ArrayWritable>, CombineHiveInputFormat.AvoidSplitCombination {
+ private static final String CARBON_TABLE = "mapreduce.input.carboninputformat.table";
@Override public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException {
org.apache.hadoop.mapreduce.JobContext jobContext = Job.getInstance(jobConf);
@@ -63,47 +68,64 @@ public class MapredCarbonInputFormat extends CarbonInputFormat<ArrayWritable>
@Override
public RecordReader<Void, ArrayWritable> getRecordReader(InputSplit inputSplit, JobConf jobConf,
Reporter reporter) throws IOException {
- QueryModel queryModel = getQueryModel(jobConf);
- CarbonReadSupport<ArrayWritable> readSupport = getReadSupportClass(jobConf);
+ String path = null;
+ if (inputSplit instanceof CarbonHiveInputSplit) {
+ path = ((CarbonHiveInputSplit) inputSplit).getPath().toString();
+ }
+ QueryModel queryModel = getQueryModel(jobConf, path);
+ CarbonReadSupport<ArrayWritable> readSupport = new CarbonDictionaryDecodeReadSupport<>();
return new CarbonHiveRecordReader(queryModel, readSupport, inputSplit, jobConf);
}
- private QueryModel getQueryModel(Configuration configuration) throws IOException {
- CarbonTable carbonTable = getCarbonTable(configuration);
+ /**
+ * this method will read the schema from the physical file and populate into CARBON_TABLE
+ *
+ * @param configuration
+ * @throws IOException
+ */
+ private static void populateCarbonTable(Configuration configuration, String paths)
+ throws IOException {
+ String dirs = configuration.get(INPUT_DIR, "");
+ String[] inputPaths = StringUtils.split(dirs);
+ String validInputPath = null;
+ if (inputPaths.length == 0) {
+ throw new InvalidPathException("No input paths specified in job");
+ } else {
+ if (paths != null) {
+ for (String inputPath : inputPaths) {
+ if (paths.startsWith(inputPath)) {
+ validInputPath = inputPath;
+ break;
+ }
+ }
+ }
+ }
+ AbsoluteTableIdentifier absoluteTableIdentifier =
+ AbsoluteTableIdentifier.fromTablePath(validInputPath);
+ // read the schema file to get the absoluteTableIdentifier having the correct table id
+ // persisted in the schema
+ CarbonTable carbonTable = SchemaReader.readCarbonTableFromStore(absoluteTableIdentifier);
+ setCarbonTable(configuration, carbonTable);
+ }
+
+ private static CarbonTable getCarbonTable(Configuration configuration, String path)
+ throws IOException {
+ populateCarbonTable(configuration, path);
+ // read it from schema file in the store
+ String carbonTableStr = configuration.get(CARBON_TABLE);
+ return (CarbonTable) ObjectSerializationUtil.convertStringToObject(carbonTableStr);
+ }
+
+ private QueryModel getQueryModel(Configuration configuration, String path) throws IOException {
+ CarbonTable carbonTable = getCarbonTable(configuration, path);
// getting the table absoluteTableIdentifier from the carbonTable
// to avoid unnecessary deserialization
StringBuilder colNames = new StringBuilder();
AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier();
- // query plan includes projection column
- String projection = getColumnProjection(configuration);
- if (projection == null) {
- projection = configuration.get("hive.io.file.readcolumn.names");
- }
- if (projection.equals("")) {
- List<CarbonDimension> carbonDimensionList = carbonTable.getAllDimensions();
- List<CarbonMeasure> carbonMeasureList = carbonTable.getAllMeasures();
-
- for (CarbonDimension aCarbonDimensionList : carbonDimensionList) {
- colNames = new StringBuilder((colNames + (aCarbonDimensionList.getColName())) + ",");
- }
- if (carbonMeasureList.size() < 1) {
- colNames = new StringBuilder(colNames.substring(0, colNames.lastIndexOf(",")));
- }
- for (int index = 0; index < carbonMeasureList.size(); index++) {
- if (!carbonMeasureList.get(index).getColName().equals("default_dummy_measure")) {
- if (index == carbonMeasureList.size() - 1) {
- colNames.append(carbonMeasureList.get(index).getColName());
- } else {
- colNames =
- new StringBuilder((colNames + (carbonMeasureList.get(index).getColName())) + ",");
- }
- }
- }
- projection = colNames.toString().trim();
- configuration.set("hive.io.file.readcolumn.names", colNames.toString());
- }
+ String projection = getProjection(configuration, carbonTable,
+ identifier.getCarbonTableIdentifier().getTableName());
CarbonQueryPlan queryPlan = CarbonInputFormatUtil.createQueryPlan(carbonTable, projection);
QueryModel queryModel = QueryModel.createModel(identifier, queryPlan, carbonTable);
// set the filter to the query model in order to filter blocklet before scan
@@ -115,6 +137,45 @@ public class MapredCarbonInputFormat extends CarbonInputFormat<ArrayWritable>
return queryModel;
}
+ /**
+ * Return the Projection for the CarbonQuery.
+ *
+ * @param configuration
+ * @param carbonTable
+ * @param tableName
+ * @return
+ */
+ private String getProjection(Configuration configuration, CarbonTable carbonTable,
+ String tableName) {
+ // query plan includes projection column
+ String projection = getColumnProjection(configuration);
+ if (projection == null) {
+ projection = configuration.get("hive.io.file.readcolumn.names");
+ }
+ List<CarbonColumn> carbonColumns = carbonTable.getCreateOrderColumn(tableName);
+ List<String> carbonColumnNames = new ArrayList<>();
+ StringBuilder allColumns = new StringBuilder();
+ StringBuilder projectionColumns = new StringBuilder();
+ for (CarbonColumn column : carbonColumns) {
+ carbonColumnNames.add(column.getColName());
+ allColumns.append(column.getColName() + ",");
+ }
+
+ if (!projection.equals("")) {
+ String[] columnNames = projection.split(",");
+ //verify that the columns parsed by Hive exist in the table
+ for (String col : columnNames) {
+ //show columns command will return these data
+ if (carbonColumnNames.contains(col)) {
+ projectionColumns.append(col + ",");
+ }
+ }
+ return projectionColumns.substring(0, projectionColumns.lastIndexOf(","));
+ } else {
+ return allColumns.substring(0, allColumns.lastIndexOf(","));
+ }
+ }
+
@Override public boolean shouldSkipCombine(Path path, Configuration conf) throws IOException {
return true;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cbe14197/integration/hive/src/main/java/org/apache/carbondata/hive/server/HiveEmbeddedServer2.java
----------------------------------------------------------------------
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/server/HiveEmbeddedServer2.java b/integration/hive/src/main/java/org/apache/carbondata/hive/server/HiveEmbeddedServer2.java
index d8705f8..ae931fb 100644
--- a/integration/hive/src/main/java/org/apache/carbondata/hive/server/HiveEmbeddedServer2.java
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/server/HiveEmbeddedServer2.java
@@ -130,6 +130,7 @@ public class HiveEmbeddedServer2 {
conf.set("hive.added.files.path", "");
conf.set("hive.added.archives.path", "");
conf.set("fs.default.name", "file:///");
+ conf.set(HiveConf.ConfVars.SUBMITLOCALTASKVIACHILD.varname, "false");
// clear mapred.job.tracker - Hadoop defaults to 'local' if not defined. Hive however expects
// this to be set to 'local' - if it's not, it does a remote execution (i.e. no child JVM)