You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2017/05/24 10:49:05 UTC
[2/5] carbondata git commit: resolved rebase conflict for presto
branch
resolved rebase conflict for presto branch
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/4b682f54
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/4b682f54
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/4b682f54
Branch: refs/heads/master
Commit: 4b682f541976cd3e56b486e3479f47096cc1bcd9
Parents: a60775e
Author: chenliang613 <ch...@apache.org>
Authored: Wed May 24 18:37:27 2017 +0800
Committer: chenliang613 <ch...@apache.org>
Committed: Wed May 24 18:37:27 2017 +0800
----------------------------------------------------------------------
.../presto/CarbondataRecordSetProvider.java | 277 +++++++++++++++++++
1 file changed, 277 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/4b682f54/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
new file mode 100755
index 0000000..63b926f
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import org.apache.carbondata.presto.impl.CarbonTableCacheModel;
+import org.apache.carbondata.presto.impl.CarbonTableReader;
+import com.facebook.presto.spi.ColumnHandle;
+import com.facebook.presto.spi.ConnectorSession;
+import com.facebook.presto.spi.ConnectorSplit;
+import com.facebook.presto.spi.RecordSet;
+import com.facebook.presto.spi.connector.ConnectorRecordSetProvider;
+import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+import com.facebook.presto.spi.predicate.Domain;
+import com.facebook.presto.spi.predicate.Range;
+import com.facebook.presto.spi.predicate.TupleDomain;
+import com.facebook.presto.spi.type.*;
+import com.google.common.collect.ImmutableList;
+import io.airlift.slice.Slice;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.scan.expression.ColumnExpression;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.expression.LiteralExpression;
+import org.apache.carbondata.core.scan.expression.conditional.*;
+import org.apache.carbondata.core.scan.expression.logical.AndExpression;
+import org.apache.carbondata.core.scan.expression.logical.OrExpression;
+import org.apache.carbondata.core.scan.model.CarbonQueryPlan;
+import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
+
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.carbondata.presto.Types.checkType;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.Objects.requireNonNull;
+
+public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider {
+
+ private final String connectorId;
+ private final CarbonTableReader carbonTableReader;
+
+ @Inject
+ public CarbondataRecordSetProvider(CarbondataConnectorId connectorId, CarbonTableReader reader) {
+ //this.config = requireNonNull(config, "config is null");
+ //this.connector = requireNonNull(connector, "connector is null");
+ this.connectorId = requireNonNull(connectorId, "connectorId is null").toString();
+ this.carbonTableReader = reader;
+ }
+
+ @Override public RecordSet getRecordSet(ConnectorTransactionHandle transactionHandle,
+ ConnectorSession session, ConnectorSplit split, List<? extends ColumnHandle> columns) {
+ requireNonNull(split, "split is null");
+ requireNonNull(columns, "columns is null");
+
+ // Convert split
+ CarbondataSplit cdSplit =
+ checkType(split, CarbondataSplit.class, "split is not class CarbondataSplit");
+ checkArgument(cdSplit.getConnectorId().equals(connectorId), "split is not for this connector");
+
+ String targetCols = "";
+ // Convert all columns handles
+ ImmutableList.Builder<CarbondataColumnHandle> handles = ImmutableList.builder();
+ for (ColumnHandle handle : columns) {
+ handles.add(checkType(handle, CarbondataColumnHandle.class, "handle"));
+ targetCols += ((CarbondataColumnHandle) handle).getColumnName() + ",";
+ }
+
+ // Build column projection(check the column order)
+ if (targetCols.length() > 0) {
+ targetCols = targetCols.substring(0, targetCols.length() - 1);
+ }
+ else
+ {
+ targetCols = null;
+ }
+ //String cols = String.join(",", columns.stream().map(a -> ((CarbondataColumnHandle)a).getColumnName()).collect(Collectors.toList()));
+
+ CarbonTableCacheModel tableCacheModel =
+ carbonTableReader.getCarbonCache(cdSplit.getSchemaTableName());
+ checkNotNull(tableCacheModel, "tableCacheModel should not be null");
+ checkNotNull(tableCacheModel.carbonTable, "tableCacheModel.carbonTable should not be null");
+ checkNotNull(tableCacheModel.tableInfo, "tableCacheModel.tableInfo should not be null");
+
+ // Build Query Model
+ CarbonTable targetTable = tableCacheModel.carbonTable;
+ CarbonQueryPlan queryPlan = CarbonInputFormatUtil.createQueryPlan(targetTable, targetCols);
+ QueryModel queryModel =
+ QueryModel.createModel(targetTable.getAbsoluteTableIdentifier(), queryPlan, targetTable);
+
+ // Push down filter
+ fillFilter2QueryModel(queryModel, cdSplit.getConstraints(), targetTable);
+
+ // Return new record set
+ return new CarbondataRecordSet(targetTable,/*connector,*/ session, /*config, */cdSplit,
+ handles.build(), queryModel);
+ }
+
+ // Build filter for QueryModel (copy from CarbonInputFormat=> createRecordReader)
+ private void fillFilter2QueryModel(QueryModel queryModel,
+ TupleDomain<ColumnHandle> originalConstraint, CarbonTable carbonTable) {
+
+ //queryModel.setFilterExpressionResolverTree(new FilterResolverIntf());
+
+ //Build Predicate Expression
+ ImmutableList.Builder<Expression> filters = ImmutableList.builder();
+
+ Domain domain = null;
+
+ for (ColumnHandle c : originalConstraint.getDomains().get().keySet()) {
+
+ // Build ColumnExpresstion for Expresstion(Carbondata)
+ CarbondataColumnHandle cdch = (CarbondataColumnHandle) c;
+ Type type = cdch.getColumnType();
+
+ DataType coltype = Spi2CarbondataTypeMapper(type);
+ Expression colExpression = new ColumnExpression(cdch.getColumnName(), coltype);
+
+ domain = originalConstraint.getDomains().get().get(c);
+ checkArgument(domain.getType().isOrderable(), "Domain type must be orderable");
+
+ if (domain.getValues().isNone()) {
+ //return QueryBuilders.filteredQuery(null, FilterBuilders.missingFilter(columnName));
+ //return domain.isNullAllowed() ? columnName + " IS NULL" : "FALSE";
+ //new Expression()
+ }
+
+ if (domain.getValues().isAll()) {
+ //return QueryBuilders.filteredQuery(null, FilterBuilders.existsFilter(columnName));
+ //return domain.isNullAllowed() ? "TRUE" : columnName + " IS NOT NULL";
+ }
+
+ List<Object> singleValues = new ArrayList<>();
+ List<Expression> rangeFilter = new ArrayList<>();
+ for (Range range : domain.getValues().getRanges().getOrderedRanges()) {
+ checkState(!range.isAll()); // Already checked
+ if (range.isSingleValue()) {
+ singleValues.add(range.getLow().getValue());
+ } else {
+ List<String> rangeConjuncts = new ArrayList<>();
+ if (!range.getLow().isLowerUnbounded()) {
+ Object value = ConvertDataByType(range.getLow().getValue(), type);
+ switch (range.getLow().getBound()) {
+ case ABOVE:
+ if (type == TimestampType.TIMESTAMP) {
+ //todo not now
+ } else {
+ GreaterThanExpression greater = new GreaterThanExpression(colExpression,
+ new LiteralExpression(value, coltype));
+ //greater.setRangeExpression(true);
+ rangeFilter.add(greater);
+ }
+ break;
+ case EXACTLY:
+ GreaterThanEqualToExpression greater =
+ new GreaterThanEqualToExpression(colExpression,
+ new LiteralExpression(value, coltype));
+ //greater.setRangeExpression(true);
+ rangeFilter.add(greater);
+ break;
+ case BELOW:
+ throw new IllegalArgumentException("Low marker should never use BELOW bound");
+ default:
+ throw new AssertionError("Unhandled bound: " + range.getLow().getBound());
+ }
+ }
+ if (!range.getHigh().isUpperUnbounded()) {
+ Object value = ConvertDataByType(range.getHigh().getValue(), type);
+ switch (range.getHigh().getBound()) {
+ case ABOVE:
+ throw new IllegalArgumentException("High marker should never use ABOVE bound");
+ case EXACTLY:
+ LessThanEqualToExpression less = new LessThanEqualToExpression(colExpression,
+ new LiteralExpression(value, coltype));
+ //less.setRangeExpression(true);
+ rangeFilter.add(less);
+ break;
+ case BELOW:
+ LessThanExpression less2 =
+ new LessThanExpression(colExpression, new LiteralExpression(value, coltype));
+ //less2.setRangeExpression(true);
+ rangeFilter.add(less2);
+ break;
+ default:
+ throw new AssertionError("Unhandled bound: " + range.getHigh().getBound());
+ }
+ }
+ }
+ }
+
+ if (singleValues.size() == 1) {
+ Expression ex = null;
+ if (coltype.equals(DataType.STRING)) {
+ ex = new EqualToExpression(colExpression,
+ new LiteralExpression(((Slice) singleValues.get(0)).toStringUtf8(), coltype));
+ } else ex = new EqualToExpression(colExpression,
+ new LiteralExpression(singleValues.get(0), coltype));
+ filters.add(ex);
+ } else if (singleValues.size() > 1) {
+ ListExpression candidates = null;
+ List<Expression> exs = singleValues.stream().map((a) -> {
+ return new LiteralExpression(ConvertDataByType(a, type), coltype);
+ }).collect(Collectors.toList());
+ candidates = new ListExpression(exs);
+
+ if (candidates != null) filters.add(new InExpression(colExpression, candidates));
+ } else if (rangeFilter.size() > 0) {
+ if (rangeFilter.size() > 1) {
+ Expression finalFilters = new OrExpression(rangeFilter.get(0), rangeFilter.get(1));
+ if (rangeFilter.size() > 2) {
+ for (int i = 2; i < rangeFilter.size(); i++) {
+ filters.add(new AndExpression(finalFilters, rangeFilter.get(i)));
+ }
+ }
+ } else if (rangeFilter.size() == 1) filters.add(rangeFilter.get(0));
+ }
+ }
+
+ Expression finalFilters;
+ List<Expression> tmp = filters.build();
+ if (tmp.size() > 1) {
+ finalFilters = new AndExpression(tmp.get(0), tmp.get(1));
+ if (tmp.size() > 2) {
+ for (int i = 2; i < tmp.size(); i++) {
+ finalFilters = new AndExpression(finalFilters, tmp.get(i));
+ }
+ }
+ } else if (tmp.size() == 1) finalFilters = tmp.get(0);
+ else return;
+
+ // todo set into QueryModel
+ CarbonInputFormatUtil.processFilterExpression(finalFilters, carbonTable);
+ queryModel.setFilterExpressionResolverTree(
+ CarbonInputFormatUtil.resolveFilter(finalFilters, queryModel.getAbsoluteTableIdentifier()));
+ }
+
+ public static DataType Spi2CarbondataTypeMapper(Type colType) {
+ if (colType == BooleanType.BOOLEAN) return DataType.BOOLEAN;
+ else if (colType == SmallintType.SMALLINT) return DataType.SHORT;
+ else if (colType == IntegerType.INTEGER) return DataType.INT;
+ else if (colType == BigintType.BIGINT) return DataType.LONG;
+ else if (colType == DoubleType.DOUBLE) return DataType.DOUBLE;
+ else if (colType == DecimalType.createDecimalType()) return DataType.DECIMAL;
+ else if (colType == VarcharType.VARCHAR) return DataType.STRING;
+ else if (colType == DateType.DATE) return DataType.DATE;
+ else if (colType == TimestampType.TIMESTAMP) return DataType.TIMESTAMP;
+ else return DataType.STRING;
+ }
+
+ public Object ConvertDataByType(Object rawdata, Type type) {
+ if (type.equals(IntegerType.INTEGER)) return new Integer((rawdata.toString()));
+ else if (type.equals(BigintType.BIGINT)) return (Long) rawdata;
+ else if (type.equals(VarcharType.VARCHAR)) return ((Slice) rawdata).toStringUtf8();
+ else if (type.equals(BooleanType.BOOLEAN)) return (Boolean) (rawdata);
+
+ return rawdata;
+ }
+}