You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2017/07/26 14:37:11 UTC
carbondata git commit: [CARBONDATA-1323] Presto Optimization for
Integration Layer
Repository: carbondata
Updated Branches:
refs/heads/master 06ddd82f6 -> f2bb8d380
[CARBONDATA-1323] Presto Optimization for Integration Layer
This closes #1190
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/f2bb8d38
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/f2bb8d38
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/f2bb8d38
Branch: refs/heads/master
Commit: f2bb8d3804973e02a45fb4f1ac6cb458590af1e5
Parents: 06ddd82
Author: Bhavya Aggarwal <bh...@knoldus.com>
Authored: Fri Jul 21 13:06:46 2017 +0530
Committer: chenliang613 <ch...@apache.org>
Committed: Wed Jul 26 22:36:56 2017 +0800
----------------------------------------------------------------------
integration/presto/pom.xml | 27 +++++++
.../carbondata/presto/CarbondataPageSource.java | 75 +++++++++++++-----
.../presto/CarbondataRecordCursor.java | 83 +++++++++-----------
.../carbondata/presto/CarbondataRecordSet.java | 42 +++++-----
.../presto/CarbonDictionaryDecodeSupport.scala | 65 +++++++++++++++
5 files changed, 208 insertions(+), 84 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f2bb8d38/integration/presto/pom.xml
----------------------------------------------------------------------
diff --git a/integration/presto/pom.xml b/integration/presto/pom.xml
index 25eb6a7..4619413 100644
--- a/integration/presto/pom.xml
+++ b/integration/presto/pom.xml
@@ -228,6 +228,33 @@
<skip>true</skip>
</configuration>
</plugin>
+ <plugin>
+ <groupId>org.scala-tools</groupId>
+ <artifactId>maven-scala-plugin</artifactId>
+ <version>2.15.2</version>
+ <executions>
+ <execution>
+ <id>compile</id>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ <phase>compile</phase>
+ </execution>
+ <execution>
+ <id>testCompile</id>
+ <goals>
+ <goal>testCompile</goal>
+ </goals>
+ <phase>test</phase>
+ </execution>
+ <execution>
+ <phase>process-resources</phase>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f2bb8d38/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
index 7c50c66..c03983e 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
@@ -27,7 +27,10 @@ import com.facebook.presto.spi.Page;
import com.facebook.presto.spi.PageBuilder;
import com.facebook.presto.spi.RecordCursor;
import com.facebook.presto.spi.RecordSet;
+import com.facebook.presto.spi.block.Block;
import com.facebook.presto.spi.block.BlockBuilder;
+import com.facebook.presto.spi.block.LazyBlock;
+import com.facebook.presto.spi.block.LazyBlockLoader;
import com.facebook.presto.spi.type.DecimalType;
import com.facebook.presto.spi.type.Type;
import io.airlift.slice.Slice;
@@ -51,21 +54,20 @@ public class CarbondataPageSource implements ConnectorPageSource {
private final PageBuilder pageBuilder;
private boolean closed;
private final char[] buffer = new char[100];
+ private Block[] blocks;
- public CarbondataPageSource(RecordSet recordSet)
- {
+ public CarbondataPageSource(RecordSet recordSet) {
this(requireNonNull(recordSet, "recordSet is null").getColumnTypes(), recordSet.cursor());
}
- public CarbondataPageSource(List<Type> types, RecordCursor cursor)
- {
+ public CarbondataPageSource(List<Type> types, RecordCursor cursor) {
this.cursor = requireNonNull(cursor, "cursor is null");
this.types = unmodifiableList(new ArrayList<>(requireNonNull(types, "types is null")));
this.pageBuilder = new PageBuilder(this.types);
+ this.blocks = new Block[types.size()];
}
- public RecordCursor getCursor()
- {
+ public RecordCursor getCursor() {
return cursor;
}
@@ -86,6 +88,9 @@ public class CarbondataPageSource implements ConnectorPageSource {
}
@Override public Page getNextPage() {
+ BlockBuilder output;
+ Page page;
+ int size = types.size();
if (!closed) {
int i;
for (i = 0; i < ROWS_PER_REQUEST; i++) {
@@ -98,8 +103,9 @@ public class CarbondataPageSource implements ConnectorPageSource {
}
pageBuilder.declarePosition();
- for (int column = 0; column < types.size(); column++) {
- BlockBuilder output = pageBuilder.getBlockBuilder(column);
+
+ for (int column = 0; column < size; column++) {
+ output = pageBuilder.getBlockBuilder(column);
if (cursor.isNull(column)) {
output.appendNull();
} else {
@@ -113,8 +119,7 @@ public class CarbondataPageSource implements ConnectorPageSource {
type.writeDouble(output, cursor.getDouble(column));
} else if (javaType == Slice.class) {
Slice slice = cursor.getSlice(column);
- if(type instanceof DecimalType)
- {
+ if (type instanceof DecimalType) {
if (isShortDecimal(type)) {
type.writeLong(output, parseLong((DecimalType) type, slice, 0, slice.length()));
} else {
@@ -127,6 +132,8 @@ public class CarbondataPageSource implements ConnectorPageSource {
type.writeObject(output, cursor.getObject(column));
}
}
+ blocks[column] = new LazyBlock(output.getPositionCount(),
+ new CarbonBlockLoader(output.build(), types.get(column)));
}
}
}
@@ -135,10 +142,16 @@ public class CarbondataPageSource implements ConnectorPageSource {
if (pageBuilder.isEmpty() || (!closed && !pageBuilder.isFull())) {
return null;
}
- Page page = pageBuilder.build();
+
+ if (blocks != null && blocks.length > 0) {
+ page = new Page(blocks[0].getPositionCount(), blocks);
+ } else {
+ page = pageBuilder.build();
+ }
+
pageBuilder.reset();
return page;
- }
+ }
@Override public long getSystemMemoryUsage() {
return cursor.getSystemMemoryUsage() + pageBuilder.getSizeInBytes();
@@ -150,29 +163,49 @@ public class CarbondataPageSource implements ConnectorPageSource {
}
- private long parseLong(DecimalType type, Slice slice, int offset, int length)
- {
+ private long parseLong(DecimalType type, Slice slice, int offset, int length) {
BigDecimal decimal = parseBigDecimal(type, slice, offset, length);
return decimal.unscaledValue().longValue();
}
-
- private Slice parseSlice(DecimalType type, Slice slice, int offset, int length)
- {
+ private Slice parseSlice(DecimalType type, Slice slice, int offset, int length) {
BigDecimal decimal = parseBigDecimal(type, slice, offset, length);
return encodeUnscaledValue(decimal.unscaledValue());
}
- private BigDecimal parseBigDecimal(DecimalType type, Slice slice, int offset, int length)
- {
+ private BigDecimal parseBigDecimal(DecimalType type, Slice slice, int offset, int length) {
checkArgument(length < buffer.length);
for (int i = 0; i < length; i++) {
buffer[i] = (char) slice.getByte(offset + i);
}
BigDecimal decimal = new BigDecimal(buffer, 0, length);
- checkState(decimal.scale() <= type.getScale(), "Read decimal value scale larger than column scale");
+ checkState(decimal.scale() <= type.getScale(),
+ "Read decimal value scale larger than column scale");
decimal = decimal.setScale(type.getScale(), HALF_UP);
- checkState(decimal.precision() <= type.getPrecision(), "Read decimal precision larger than column precision");
+ checkState(decimal.precision() <= type.getPrecision(),
+ "Read decimal precision larger than column precision");
return decimal;
}
+
+ /**
+ * Using the LazyBlockLoader
+ */
+ private final class CarbonBlockLoader implements LazyBlockLoader<LazyBlock> {
+ private boolean loaded;
+ private Block dataBlock;
+ private Type type;
+
+ public CarbonBlockLoader(Block dataBlock, Type type) {
+ this.dataBlock = dataBlock;
+ this.type = type;
+ }
+
+ @Override public void load(LazyBlock block) {
+ if (loaded) {
+ return;
+ }
+ block.setBlock(dataBlock);
+ loaded = true;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f2bb8d38/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java
index 2e97dc0..d6b1422 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java
@@ -17,29 +17,24 @@
package org.apache.carbondata.presto;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Timestamp;
+import java.util.List;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+
import com.facebook.presto.spi.RecordCursor;
import com.facebook.presto.spi.type.DecimalType;
import com.facebook.presto.spi.type.Decimals;
import com.facebook.presto.spi.type.TimestampType;
import com.facebook.presto.spi.type.Type;
-import com.facebook.presto.spi.block.Block;
-import com.facebook.presto.spi.block.BlockBuilder;
-import com.facebook.presto.spi.block.BlockBuilderStatus;
-
-import com.google.common.base.Strings;
import io.airlift.log.Logger;
import io.airlift.slice.Slice;
-import io.airlift.slice.Slices;
-import org.apache.carbondata.common.CarbonIterator;
-import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
-
-
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.List;
-
+import scala.Int;
+import scala.Tuple3;
import static com.facebook.presto.spi.type.BooleanType.BOOLEAN;
import static com.facebook.presto.spi.type.Decimals.isShortDecimal;
@@ -55,22 +50,24 @@ public class CarbondataRecordCursor implements RecordCursor {
private static final Logger log = Logger.get(CarbondataRecordCursor.class);
private final List<CarbondataColumnHandle> columnHandles;
- private List<String> fields;
+ private Object[] fields;
private CarbondataSplit split;
private CarbonIterator<Object[]> rowCursor;
- private CarbonReadSupport<Object[]> readSupport;
+ private CarbonDictionaryDecodeReaderSupport readSupport;
+ private Tuple3<DataType,Dictionary,Int>[] dictionary;
private long totalBytes;
private long nanoStart;
private long nanoEnd;
- public CarbondataRecordCursor(CarbonReadSupport<Object[]> readSupport,
+ public CarbondataRecordCursor(CarbonDictionaryDecodeReaderSupport readSupport,
CarbonIterator<Object[]> carbonIterator, List<CarbondataColumnHandle> columnHandles,
- CarbondataSplit split) {
+ CarbondataSplit split, Tuple3<DataType,Dictionary,Int>[] dictionaries ) {
this.rowCursor = carbonIterator;
this.columnHandles = columnHandles;
this.readSupport = readSupport;
this.totalBytes = 0;
+ this.dictionary = dictionaries;
}
@Override public long getTotalBytes() {
@@ -101,20 +98,8 @@ public class CarbondataRecordCursor implements RecordCursor {
}
if (rowCursor.hasNext()) {
- Object[] columns = readSupport.readRow(rowCursor.next());
- fields = new ArrayList<String>();
- if(columns != null && columns.length > 0)
- {
- for(Object value : columns){
- if(value != null )
- {
- fields.add(value.toString());
- } else {
- fields.add(null);
- }
- }
- }
- totalBytes += columns.length;
+ fields = readSupport.readRow(rowCursor.next(),dictionary);
+ totalBytes += fields.length;
return true;
}
return false;
@@ -122,22 +107,30 @@ public class CarbondataRecordCursor implements RecordCursor {
@Override public boolean getBoolean(int field) {
checkFieldType(field, BOOLEAN);
- return Boolean.parseBoolean(getFieldValue(field));
+ return (Boolean)getFieldValue(field);
}
@Override public long getLong(int field) {
- String timeStr = getFieldValue(field);
+ Object obj = getFieldValue(field);
+ Long timeStr = 0L;
+ if( obj instanceof Integer ){
+ timeStr = ((Integer)obj).longValue();
+ } else if( obj instanceof Long ) {
+ timeStr = (Long)obj;
+ } else {
+ timeStr = Math.round(Double.parseDouble(obj.toString()));
+ }
Type actual = getType(field);
if(actual instanceof TimestampType){
- return new Timestamp(Long.parseLong(timeStr)).getTime()/1000;
+ return new Timestamp(timeStr).getTime()/1000;
}
//suppose the
- return Math.round(Double.parseDouble(getFieldValue(field)));
+ return timeStr;
}
@Override public double getDouble(int field) {
checkFieldType(field, DOUBLE);
- return Double.parseDouble(getFieldValue(field));
+ return (Double)getFieldValue(field);
}
@Override public Slice getSlice(int field) {
@@ -150,8 +143,8 @@ public class CarbondataRecordCursor implements RecordCursor {
} else {
checkFieldType(field, DecimalType.createDecimalType());
}
- String fieldValue = getFieldValue(field);
- BigDecimal bigDecimalValue = new BigDecimal(fieldValue);
+ Object fieldValue = getFieldValue(field);
+ BigDecimal bigDecimalValue = new BigDecimal(fieldValue.toString());
if (isShortDecimal(decimalType)) {
return utf8Slice(Decimals.toString(bigDecimalValue.longValue(), actual.getScale()));
} else {
@@ -174,7 +167,7 @@ public class CarbondataRecordCursor implements RecordCursor {
}
} else {
checkFieldType(field, VARCHAR);
- return utf8Slice(getFieldValue(field));
+ return utf8Slice(getFieldValue(field).toString());
}
}
@@ -184,12 +177,12 @@ public class CarbondataRecordCursor implements RecordCursor {
@Override public boolean isNull(int field) {
checkArgument(field < columnHandles.size(), "Invalid field index");
- return Strings.isNullOrEmpty(getFieldValue(field));
+ return getFieldValue(field) == null;
}
- String getFieldValue(int field) {
+ Object getFieldValue(int field) {
checkState(fields != null, "Cursor has not been advanced yet");
- return fields.get(field);
+ return fields[field];
}
private void checkFieldType(int field, Type expected) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f2bb8d38/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java
index d75cbfb..661e83f 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java
@@ -17,9 +17,10 @@
package org.apache.carbondata.presto;
-import com.facebook.presto.spi.*;
-import com.facebook.presto.spi.predicate.TupleDomain;
-import com.facebook.presto.spi.type.Type;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
import org.apache.carbondata.common.CarbonIterator;
import org.apache.carbondata.core.datastore.block.BlockletInfos;
import org.apache.carbondata.core.datastore.block.TableBlockInfo;
@@ -32,16 +33,20 @@ import org.apache.carbondata.core.scan.expression.Expression;
import org.apache.carbondata.core.scan.model.QueryModel;
import org.apache.carbondata.core.scan.result.BatchResult;
import org.apache.carbondata.core.scan.result.iterator.ChunkRowIterator;
-import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
-import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport;
-//import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodedReadSupportImpl;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.stream.Collectors;
+import com.facebook.presto.spi.ColumnHandle;
+import com.facebook.presto.spi.ConnectorSession;
+import com.facebook.presto.spi.ConnectorSplit;
+import com.facebook.presto.spi.RecordCursor;
+import com.facebook.presto.spi.RecordSet;
+import com.facebook.presto.spi.predicate.TupleDomain;
+import com.facebook.presto.spi.type.Type;
+import scala.Tuple3;
import static org.apache.carbondata.presto.Types.checkType;
+//import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodedReadSupportImpl;
+
public class CarbondataRecordSet implements RecordSet {
private CarbonTable carbonTable;
@@ -53,7 +58,7 @@ public class CarbondataRecordSet implements RecordSet {
private List<CarbondataColumnHandle> columns;
private QueryExecutor queryExecutor;
- private CarbonReadSupport<Object[]> readSupport;
+ private CarbonDictionaryDecodeReaderSupport readSupport;
public CarbondataRecordSet(CarbonTable carbonTable, ConnectorSession session,
ConnectorSplit split, List<CarbondataColumnHandle> columns, QueryModel queryModel) {
@@ -63,7 +68,7 @@ public class CarbondataRecordSet implements RecordSet {
this.rebuildConstraints = this.split.getRebuildConstraints();
this.queryModel = queryModel;
this.columns = columns;
- this.readSupport = new DictionaryDecodeReadSupport();
+ this.readSupport = new CarbonDictionaryDecodeReaderSupport();
}
//todo support later
@@ -84,24 +89,25 @@ public class CarbondataRecordSet implements RecordSet {
tableBlockInfoList.add(new TableBlockInfo(split.getLocalInputSplit().getPath().toString(),
split.getLocalInputSplit().getStart(), split.getLocalInputSplit().getSegmentId(),
split.getLocalInputSplit().getLocations().toArray(new String[0]),
- split.getLocalInputSplit().getLength(),new BlockletInfos(),
+ split.getLocalInputSplit().getLength(), new BlockletInfos(),
//blockletInfos,
- ColumnarFormatVersion.valueOf(split.getLocalInputSplit().getVersion()),null));
+ ColumnarFormatVersion.valueOf(split.getLocalInputSplit().getVersion()), null));
queryModel.setTableBlockInfos(tableBlockInfoList);
queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel);
- //queryModel.setQueryId(queryModel.getQueryId() + "_" + split.getLocalInputSplit().getSegmentId());
try {
- readSupport
+
+ Tuple3[] dict = readSupport
.initialize(queryModel.getProjectionColumns(), queryModel.getAbsoluteTableIdentifier());
CarbonIterator<Object[]> carbonIterator =
new ChunkRowIterator((CarbonIterator<BatchResult>) queryExecutor.execute(queryModel));
- RecordCursor rc = new CarbondataRecordCursor(readSupport, carbonIterator, columns, split);
+ RecordCursor rc =
+ new CarbondataRecordCursor(readSupport, carbonIterator, columns, split, dict);
return rc;
} catch (QueryExecutionException e) {
- throw new RuntimeException(e.getMessage(), e);
- } catch (Exception ex) {
+ throw new RuntimeException(e.getMessage(), e);
+ } catch (Exception ex) {
throw new RuntimeException(ex.getMessage(), ex);
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f2bb8d38/integration/presto/src/main/scala/org/apache/carbondata/presto/CarbonDictionaryDecodeSupport.scala
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/scala/org/apache/carbondata/presto/CarbonDictionaryDecodeSupport.scala b/integration/presto/src/main/scala/org/apache/carbondata/presto/CarbonDictionaryDecodeSupport.scala
new file mode 100644
index 0000000..fbdfebd
--- /dev/null
+++ b/integration/presto/src/main/scala/org/apache/carbondata/presto/CarbonDictionaryDecodeSupport.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.presto
+
+import org.apache.carbondata.core.cache.{Cache, CacheProvider, CacheType}
+import org.apache.carbondata.core.cache.dictionary.{Dictionary, DictionaryColumnUniqueIdentifier}
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.metadata.datatype.DataType
+import org.apache.carbondata.core.metadata.encoder.Encoding
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn
+
+
+class CarbonDictionaryDecodeReaderSupport[T] {
+
+ def initialize(carbonColumns: Array[CarbonColumn],
+ absoluteTableIdentifier: AbsoluteTableIdentifier): Array[(DataType, Dictionary, Int)] = {
+
+ carbonColumns.zipWithIndex.filter(dictChecker(_)).map { carbonColumnWithIndex =>
+ val (carbonColumn, index) = carbonColumnWithIndex
+ val forwardDictionaryCache: Cache[DictionaryColumnUniqueIdentifier, Dictionary] =
+ CacheProvider.getInstance()
+ .createCache(CacheType.FORWARD_DICTIONARY, absoluteTableIdentifier
+ .getStorePath)
+ val dict: Dictionary = forwardDictionaryCache
+ .get(new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier.getCarbonTableIdentifier,
+ carbonColumn.getColumnIdentifier,
+ carbonColumn.getDataType))
+ (carbonColumn.getDataType, dict, index)
+ }
+ }
+
+ private def dictChecker(carbonColumWithIndex: (CarbonColumn, Int)): Boolean = {
+ val (carbonColumn, _) = carbonColumWithIndex
+ if (!carbonColumn.hasEncoding(Encoding.DIRECT_DICTIONARY) && !carbonColumn.isComplex &&
+ carbonColumn.hasEncoding(Encoding.DICTIONARY)) {
+ true
+ } else {
+ false
+ }
+ }
+
+ def readRow(data: Array[Object],
+ dictionaries: Array[(DataType, Dictionary, Int)]): Array[Object] = {
+ dictionaries.foreach { (dictionary: (DataType, Dictionary, Int)) =>
+ val (_, dict, position) = dictionary
+ data(position) = dict.getDictionaryValueForKey(data(position).asInstanceOf[Int])
+ }
+ data
+ }
+
+}