You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/03/23 08:22:16 UTC
[3/5] incubator-carbondata git commit: Fix groupid, package name,
Class name issues
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/32bf2961/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataSplitManager.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataSplitManager.java b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataSplitManager.java
deleted file mode 100755
index 692d69e..0000000
--- a/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataSplitManager.java
+++ /dev/null
@@ -1,300 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.facebook.presto.carbondata;
-
-import com.facebook.presto.carbondata.impl.CarbonLocalInputSplit;
-import com.facebook.presto.carbondata.impl.CarbonTableCacheModel;
-import com.facebook.presto.carbondata.impl.CarbonTableReader;
-import com.facebook.presto.spi.*;
-import com.facebook.presto.spi.connector.ConnectorSplitManager;
-import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
-import com.facebook.presto.spi.predicate.Domain;
-import com.facebook.presto.spi.predicate.Range;
-import com.facebook.presto.spi.predicate.TupleDomain;
-import com.facebook.presto.spi.type.*;
-import com.google.common.collect.ImmutableList;
-import io.airlift.slice.Slice;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
-import org.apache.carbondata.core.scan.expression.ColumnExpression;
-import org.apache.carbondata.core.scan.expression.Expression;
-import org.apache.carbondata.core.scan.expression.LiteralExpression;
-import org.apache.carbondata.core.scan.expression.conditional.*;
-import org.apache.carbondata.core.scan.expression.logical.AndExpression;
-import org.apache.carbondata.core.scan.expression.logical.OrExpression;
-
-import javax.inject.Inject;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Optional;
-import java.util.stream.Collectors;
-
-import static com.facebook.presto.carbondata.Types.checkType;
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkState;
-import static java.util.Objects.requireNonNull;
-
-public class CarbondataSplitManager
- implements ConnectorSplitManager
-{
-
- private final String connectorId;
- private final CarbonTableReader carbonTableReader;
-
- @Inject
- public CarbondataSplitManager(CarbondataConnectorId connectorId, CarbonTableReader reader)
- {
- this.connectorId = requireNonNull(connectorId, "connectorId is null").toString();
- this.carbonTableReader = requireNonNull(reader, "client is null");
- }
-
- public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableLayoutHandle layout)
- {
- CarbondataTableLayoutHandle layoutHandle = (CarbondataTableLayoutHandle)layout;
- CarbondataTableHandle tableHandle = layoutHandle.getTable();
- SchemaTableName key = tableHandle.getSchemaTableName();
-
- //get all filter domain
- List<CarbondataColumnConstraint> rebuildConstraints = getColumnConstraints(layoutHandle.getConstraint());
-
- CarbonTableCacheModel cache = carbonTableReader.getCarbonCache(key);
- Expression filters = parseFilterExpression(layoutHandle.getConstraint(), cache.carbonTable);
-
- if(cache != null) {
- try {
- List<CarbonLocalInputSplit> splits = carbonTableReader.getInputSplits2(cache, filters);
-
- ImmutableList.Builder<ConnectorSplit> cSplits = ImmutableList.builder();
- for (CarbonLocalInputSplit split : splits) {
- cSplits.add(new CarbondataSplit(
- connectorId,
- tableHandle.getSchemaTableName(),
- layoutHandle.getConstraint(),
- split,
- rebuildConstraints
- ));
- }
- return new FixedSplitSource(cSplits.build());
- } catch (Exception ex) {
- System.out.println(ex.toString());
- }
- }
- return null;
- }
-
-
- public List<CarbondataColumnConstraint> getColumnConstraints(TupleDomain<ColumnHandle> constraint)
- {
- ImmutableList.Builder<CarbondataColumnConstraint> constraintBuilder = ImmutableList.builder();
- for (TupleDomain.ColumnDomain<ColumnHandle> columnDomain : constraint.getColumnDomains().get()) {
- CarbondataColumnHandle columnHandle = checkType(columnDomain.getColumn(), CarbondataColumnHandle.class, "column handle");
-
- constraintBuilder.add(new CarbondataColumnConstraint(
- columnHandle.getColumnName(),
- Optional.of(columnDomain.getDomain()),
- columnHandle.isInvertedIndex()));
- }
-
- return constraintBuilder.build();
- }
-
-
- public Expression parseFilterExpression(TupleDomain<ColumnHandle> originalConstraint, CarbonTable carbonTable)
- {
- ImmutableList.Builder<Expression> filters = ImmutableList.builder();
-
- Domain domain = null;
-
- for (ColumnHandle c : originalConstraint.getDomains().get().keySet()) {
-
- CarbondataColumnHandle cdch = (CarbondataColumnHandle) c;
- Type type = cdch.getColumnType();
-
- List<CarbonColumn> ccols = carbonTable.getCreateOrderColumn(carbonTable.getFactTableName());
- Optional<CarbonColumn> target = ccols.stream().filter(a -> a.getColName().equals(cdch.getColumnName())).findFirst();
-
- if(target.get() == null)
- return null;
-
- DataType coltype = target.get().getDataType();
- ColumnExpression colExpression = new ColumnExpression(cdch.getColumnName(), target.get().getDataType());
- //colExpression.setColIndex(cs.getSchemaOrdinal());
- colExpression.setDimension(target.get().isDimesion());
- colExpression.setDimension(carbonTable.getDimensionByName(carbonTable.getFactTableName(), cdch.getColumnName()));
- colExpression.setCarbonColumn(target.get());
-
- domain = originalConstraint.getDomains().get().get(c);
- checkArgument(domain.getType().isOrderable(), "Domain type must be orderable");
-
- if (domain.getValues().isNone()) {
- //return QueryBuilders.filteredQuery(null, FilterBuilders.missingFilter(columnName));
- //return domain.isNullAllowed() ? columnName + " IS NULL" : "FALSE";
- //new Expression()
- }
-
- if (domain.getValues().isAll()) {
- //return QueryBuilders.filteredQuery(null, FilterBuilders.existsFilter(columnName));
- //return domain.isNullAllowed() ? "TRUE" : columnName + " IS NOT NULL";
- }
-
- List<Object> singleValues = new ArrayList<>();
- List<Expression> rangeFilter = new ArrayList<>();
- for (Range range : domain.getValues().getRanges().getOrderedRanges()) {
- checkState(!range.isAll()); // Already checked
- if (range.isSingleValue()) {
- singleValues.add(range.getLow().getValue());
- }
- else
- {
- List<String> rangeConjuncts = new ArrayList<>();
- if (!range.getLow().isLowerUnbounded()) {
- Object value = ConvertDataByType(range.getLow().getValue(), type);
- switch (range.getLow().getBound()) {
- case ABOVE:
- if (type == TimestampType.TIMESTAMP) {
- //todo not now
- } else {
- GreaterThanExpression greater = new GreaterThanExpression(colExpression, new LiteralExpression(value, coltype));
- //greater.setRangeExpression(true);
- rangeFilter.add(greater);
- }
- break;
- case EXACTLY:
- GreaterThanEqualToExpression greater = new GreaterThanEqualToExpression(colExpression, new LiteralExpression(value, coltype));
- //greater.setRangeExpression(true);
- rangeFilter.add(greater);
- break;
- case BELOW:
- throw new IllegalArgumentException("Low marker should never use BELOW bound");
- default:
- throw new AssertionError("Unhandled bound: " + range.getLow().getBound());
- }
- }
- if (!range.getHigh().isUpperUnbounded()) {
- Object value = ConvertDataByType(range.getHigh().getValue(), type);
- switch (range.getHigh().getBound()) {
- case ABOVE:
- throw new IllegalArgumentException("High marker should never use ABOVE bound");
- case EXACTLY:
- LessThanEqualToExpression less = new LessThanEqualToExpression(colExpression, new LiteralExpression(value, coltype));
- //less.setRangeExpression(true);
- rangeFilter.add(less);
- break;
- case BELOW:
- LessThanExpression less2 = new LessThanExpression(colExpression, new LiteralExpression(value, coltype));
- //less2.setRangeExpression(true);
- rangeFilter.add(less2);
- break;
- default:
- throw new AssertionError("Unhandled bound: " + range.getHigh().getBound());
- }
- }
- }
- }
-
- if (singleValues.size() == 1) {
- Expression ex = null;
- if (coltype.equals(DataType.STRING)) {
- ex = new EqualToExpression(colExpression, new LiteralExpression(((Slice) singleValues.get(0)).toStringUtf8(), coltype));
- } else
- ex = new EqualToExpression(colExpression, new LiteralExpression(singleValues.get(0), coltype));
- filters.add(ex);
- }
- else if(singleValues.size() > 1) {
- ListExpression candidates = null;
- List<Expression> exs = singleValues.stream().map((a) ->
- {
- return new LiteralExpression(ConvertDataByType(a, type), coltype);
- }).collect(Collectors.toList());
- candidates = new ListExpression(exs);
-
- if(candidates != null)
- filters.add(new InExpression(colExpression, candidates));
- }
- else if(rangeFilter.size() > 0){
- if(rangeFilter.size() > 1) {
- Expression finalFilters = new OrExpression(rangeFilter.get(0), rangeFilter.get(1));
- if(rangeFilter.size() > 2)
- {
- for(int i = 2; i< rangeFilter.size(); i++)
- {
- filters.add(new AndExpression(finalFilters, rangeFilter.get(i)));
- }
- }
- }
- else if(rangeFilter.size() == 1)//only have one value
- filters.add(rangeFilter.get(0));
- }
- }
-
- Expression finalFilters;
- List<Expression> tmp = filters.build();
- if(tmp.size() > 1) {
- finalFilters = new AndExpression(tmp.get(0), tmp.get(1));
- if(tmp.size() > 2)
- {
- for(int i = 2; i< tmp.size(); i++)
- {
- finalFilters = new AndExpression(finalFilters, tmp.get(i));
- }
- }
- }
- else if(tmp.size() == 1)
- finalFilters = tmp.get(0);
- else//no filter
- return null;
-
- return finalFilters;
- }
-
- public static DataType Spi2CarbondataTypeMapper(Type colType)
- {
- if(colType == BooleanType.BOOLEAN)
- return DataType.BOOLEAN;
- else if(colType == SmallintType.SMALLINT)
- return DataType.SHORT;
- else if(colType == IntegerType.INTEGER)
- return DataType.INT;
- else if(colType == BigintType.BIGINT)
- return DataType.LONG;
- else if(colType == DoubleType.DOUBLE)
- return DataType.DOUBLE;
- else if(colType == DecimalType.createDecimalType())
- return DataType.DECIMAL;
- else if(colType == VarcharType.VARCHAR)
- return DataType.STRING;
- else if(colType == DateType.DATE)
- return DataType.DATE;
- else if(colType == TimestampType.TIMESTAMP)
- return DataType.TIMESTAMP;
- else
- return DataType.STRING;
- }
-
-
- public Object ConvertDataByType(Object rawdata, Type type)
- {
- if(type.equals(IntegerType.INTEGER))
- return new Integer((rawdata.toString()));
- else if(type.equals(BigintType.BIGINT))
- return (Long)rawdata;
- else if(type.equals(VarcharType.VARCHAR))
- return ((Slice)rawdata).toStringUtf8();
- else if(type.equals(BooleanType.BOOLEAN))
- return (Boolean)(rawdata);
-
- return rawdata;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/32bf2961/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataTableHandle.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataTableHandle.java b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataTableHandle.java
deleted file mode 100755
index 6b263e0..0000000
--- a/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataTableHandle.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.facebook.presto.carbondata;
-
-import com.facebook.presto.spi.ConnectorTableHandle;
-import com.facebook.presto.spi.SchemaTableName;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.base.Joiner;
-
-import java.util.Objects;
-
-import static java.util.Locale.ENGLISH;
-import static java.util.Objects.requireNonNull;
-
-public class CarbondataTableHandle
- implements ConnectorTableHandle {
-
- private final String connectorId;
- private final SchemaTableName schemaTableName;
-
- @JsonCreator
- public CarbondataTableHandle(
- @JsonProperty("connectorId") String connectorId,
- @JsonProperty("schemaTableName") SchemaTableName schemaTableName)
- {
- this.connectorId = requireNonNull(connectorId.toLowerCase(ENGLISH), "connectorId is null");
- this.schemaTableName = schemaTableName;
- }
-
- @JsonProperty
- public String getConnectorId()
- {
- return connectorId;
- }
-
- @JsonProperty
- public SchemaTableName getSchemaTableName()
- {
- return schemaTableName;
- }
-
- @Override
- public int hashCode()
- {
- return Objects.hash(connectorId, schemaTableName);
- }
-
- @Override
- public boolean equals(Object obj)
- {
- if (this == obj) {
- return true;
- }
- if ((obj == null) || (getClass() != obj.getClass())) {
- return false;
- }
-
- CarbondataTableHandle other = (CarbondataTableHandle) obj;
- return Objects.equals(this.connectorId, other.connectorId) && this.schemaTableName.equals(other.getSchemaTableName());
- }
-
- @Override
- public String toString()
- {
- return Joiner.on(":").join(connectorId, schemaTableName.toString());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/32bf2961/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataTableLayoutHandle.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataTableLayoutHandle.java b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataTableLayoutHandle.java
deleted file mode 100755
index 01434bd..0000000
--- a/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataTableLayoutHandle.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.facebook.presto.carbondata;
-
-import com.facebook.presto.spi.ColumnHandle;
-import com.facebook.presto.spi.ConnectorTableLayoutHandle;
-import com.facebook.presto.spi.predicate.TupleDomain;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.util.Objects;
-
-import static com.google.common.base.MoreObjects.toStringHelper;
-import static java.util.Objects.requireNonNull;
-
-public class CarbondataTableLayoutHandle
- implements ConnectorTableLayoutHandle
-{
- private final CarbondataTableHandle table;
- private final TupleDomain<ColumnHandle> constraint;
-
- @JsonCreator
- public CarbondataTableLayoutHandle(@JsonProperty("table") CarbondataTableHandle table,
- @JsonProperty("constraint") TupleDomain<ColumnHandle> constraint)
- {
- this.table = requireNonNull(table, "table is null");
- this.constraint = requireNonNull(constraint, "constraint is null");
- }
-
- @JsonProperty
- public CarbondataTableHandle getTable()
- {
- return table;
- }
-
- @JsonProperty
- public TupleDomain<ColumnHandle> getConstraint()
- {
- return constraint;
- }
-
- @Override
- public boolean equals(Object obj)
- {
- if (this == obj) {
- return true;
- }
-
- if (obj == null || getClass() != obj.getClass()) {
- return false;
- }
-
- CarbondataTableLayoutHandle other = (CarbondataTableLayoutHandle) obj;
- return Objects.equals(table, other.table)
- && Objects.equals(constraint, other.constraint);
- }
-
- @Override
- public int hashCode()
- {
- return Objects.hash(table, constraint);
- }
-
- @Override
- public String toString()
- {
- return toStringHelper(this)
- .add("table", table)
- .add("constraint", constraint)
- .toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/32bf2961/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataTransactionHandle.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataTransactionHandle.java b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataTransactionHandle.java
deleted file mode 100755
index a643a33..0000000
--- a/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataTransactionHandle.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.facebook.presto.carbondata;
-
-import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
-
-public enum CarbondataTransactionHandle
- implements ConnectorTransactionHandle
-{
- INSTANCE
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/32bf2961/integration/presto/src/main/java/com/facebook/presto/carbondata/Types.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/com/facebook/presto/carbondata/Types.java b/integration/presto/src/main/java/com/facebook/presto/carbondata/Types.java
deleted file mode 100755
index 5212dad..0000000
--- a/integration/presto/src/main/java/com/facebook/presto/carbondata/Types.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.facebook.presto.carbondata;
-
-import java.util.Locale;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static java.util.Objects.requireNonNull;
-
-public class Types {
- private Types() {}
-
- public static <A, B extends A> B checkType(A value, Class<B> target, String name)
- {
- requireNonNull(value, String.format(Locale.ENGLISH, "%s is null", name));
- checkArgument(target.isInstance(value),
- "%s must be of type %s, not %s",
- name,
- target.getName(),
- value.getClass().getName());
- return target.cast(value);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/32bf2961/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonLocalInputSplit.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonLocalInputSplit.java b/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonLocalInputSplit.java
deleted file mode 100755
index 6084022..0000000
--- a/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonLocalInputSplit.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.facebook.presto.carbondata.impl;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.util.List;
-
-public class CarbonLocalInputSplit {
-
- private static final long serialVersionUID = 3520344046772190207L;
- private String segmentId;
- private String path;
- private long start;
- private long length;
- private List<String> locations;
- private short version;
- /**
- * Number of BlockLets in a block
- */
- private int numberOfBlocklets = 0;
-
-
- @JsonProperty
- public short getVersion(){
- return version;
- }
-
- @JsonProperty
- public List<String> getLocations() {
- return locations;
- }
-
- @JsonProperty
- public long getLength() {
- return length;
- }
-
- @JsonProperty
- public long getStart() {
- return start;
- }
-
- @JsonProperty
- public String getPath() {
- return path;
- }
-
- @JsonProperty
- public String getSegmentId() {
- return segmentId;
- }
-
- @JsonProperty
- public int getNumberOfBlocklets() {
- return numberOfBlocklets;
- }
-
- @JsonCreator
- public CarbonLocalInputSplit(@JsonProperty("segmentId") String segmentId,
- @JsonProperty("path") String path,
- @JsonProperty("start") long start,
- @JsonProperty("length") long length,
- @JsonProperty("locations") List<String> locations,
- @JsonProperty("numberOfBlocklets") int numberOfBlocklets/*,
- @JsonProperty("tableBlockInfo") TableBlockInfo tableBlockInfo*/,
- @JsonProperty("version") short version) {
- this.path = path;
- this.start = start;
- this.length = length;
- this.segmentId = segmentId;
- this.locations = locations;
- this.numberOfBlocklets = numberOfBlocklets;
- //this.tableBlockInfo = tableBlockInfo;
- this.version = version;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/32bf2961/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonTableCacheModel.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonTableCacheModel.java b/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonTableCacheModel.java
deleted file mode 100755
index d47f2a5..0000000
--- a/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonTableCacheModel.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.facebook.presto.carbondata.impl;
-
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.TableInfo;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
-
-public class CarbonTableCacheModel {
-
- public CarbonTableIdentifier carbonTableIdentifier;
- public CarbonTablePath carbonTablePath;
-
- public TableInfo tableInfo;
- public CarbonTable carbonTable;
- public String[] segments;
-
- public boolean isValid()
- {
- if(carbonTable != null
- && carbonTablePath != null
- && carbonTableIdentifier != null)
- return true;
- else
- return false;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/32bf2961/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonTableConfig.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonTableConfig.java b/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonTableConfig.java
deleted file mode 100755
index cd52b85..0000000
--- a/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonTableConfig.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.facebook.presto.carbondata.impl;
-
-import io.airlift.configuration.Config;
-
-import javax.validation.constraints.NotNull;
-
-public class CarbonTableConfig {
- //read from config
- private String dbPtah;
- private String tablePath;
- private String storePath;
-
- @NotNull
- public String getDbPtah() {
- return dbPtah;
- }
-
- @Config("carbondata-store")
- public CarbonTableConfig setDbPtah(String dbPtah) {
- this.dbPtah = dbPtah;
- return this;
- }
-
- @NotNull
- public String getTablePath() {
- return tablePath;
- }
-
- @Config("carbondata-store")
- public CarbonTableConfig setTablePath(String tablePath) {
- this.tablePath = tablePath;
- return this;
- }
-
- @NotNull
- public String getStorePath() {
- return storePath;
- }
-
- @Config("carbondata-store")
- public CarbonTableConfig setStorePath(String storePath) {
- this.storePath = storePath;
- return this;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/32bf2961/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonTableReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonTableReader.java b/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonTableReader.java
deleted file mode 100755
index 40bb841..0000000
--- a/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonTableReader.java
+++ /dev/null
@@ -1,736 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.facebook.presto.carbondata.impl;
-
-import com.facebook.presto.spi.SchemaTableName;
-import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.inject.Inject;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.*;
-import org.apache.carbondata.core.datastore.block.*;
-import org.apache.carbondata.core.datastore.exception.IndexBuilderException;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
-import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.datastore.impl.btree.BTreeDataRefNodeFinder;
-import org.apache.carbondata.core.datastore.impl.btree.BlockBTreeLeafNode;
-import org.apache.carbondata.core.keygenerator.KeyGenException;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.CarbonMetadata;
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
-import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
-import org.apache.carbondata.core.metadata.converter.SchemaConverter;
-import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.TableInfo;
-import org.apache.carbondata.core.mutate.UpdateVO;
-import org.apache.carbondata.core.reader.ThriftReader;
-import org.apache.carbondata.core.scan.expression.Expression;
-import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor;
-import org.apache.carbondata.core.scan.filter.FilterUtil;
-import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
-import org.apache.carbondata.core.service.impl.PathFactory;
-import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
-import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
-import org.apache.carbondata.hadoop.CacheClient;
-import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.apache.thrift.TBase;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.net.URI;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import static java.util.Objects.requireNonNull;
-
-public class CarbonTableReader {
- //CarbonTableReader will be a facade of these utils
- //[
- // 1:CarbonMetadata,(logic table)
- // 2:FileFactory, (physic table file)
- // 3:CarbonCommonFactory, (offer some )
- // 4:DictionaryFactory, (parse dictionary util)
- //]
-
- private CarbonTableConfig config;
- private List<SchemaTableName> tableList;
- private CarbonFile dbStore;
- private FileFactory.FileType fileType;
-
- private ConcurrentHashMap<SchemaTableName, CarbonTableCacheModel> cc;//as a cache for Carbon reader
-
- @Inject
- public CarbonTableReader(CarbonTableConfig config){
- this.config = requireNonNull(config, "CarbonTableConfig is null");
- this.cc = new ConcurrentHashMap<>();
- }
-
- public CarbonTableCacheModel getCarbonCache(SchemaTableName table){
- if(!cc.containsKey(table))//for worker node to initalize carbon metastore
- {
- try(ThreadContextClassLoader ignored = new ThreadContextClassLoader(FileFactory.class.getClassLoader())) {
- if(dbStore == null) {
- fileType = FileFactory.getFileType(config.getStorePath());
- try{
- dbStore = FileFactory.getCarbonFile(config.getStorePath(), fileType);
- }catch (Exception ex){
- throw new RuntimeException(ex);
- }
- }
- }
- updateSchemaTables();
- parseCarbonMetadata(table);
- }
-
- if(cc.containsKey(table))
- return cc.get(table);
- else
- return null;//need to reload?*/
- }
-
- public List<String> getSchemaNames() {
- return updateSchemaList();
- }
-
- //default PathFilter
- private static final PathFilter DefaultFilter = new PathFilter() {
- @Override
- public boolean accept(Path path) {
- return CarbonTablePath.isCarbonDataFile(path.getName());
- }
- };
-
- public boolean updateDbStore(){
- if(dbStore == null) {
- fileType = FileFactory.getFileType(config.getStorePath());
- try{
- dbStore = FileFactory.getCarbonFile(config.getStorePath(), fileType);
- }catch (Exception ex){
- throw new RuntimeException(ex);
- }
- }
- return true;
- }
-
- public List<String> updateSchemaList() {
- updateDbStore();
-
- if(dbStore != null){
- List<String> scs = Stream.of(dbStore.listFiles()).map(a -> a.getName()).collect(Collectors.toList());
- return scs;
- }
- else
- return ImmutableList.of();
- }
-
-
- public Set<String> getTableNames(String schema) {
- requireNonNull(schema, "schema is null");
- return updateTableList(schema);
- }
-
- public Set<String> updateTableList(String dbName){
- List<CarbonFile> schema = Stream.of(dbStore.listFiles()).filter(a -> dbName.equals(a.getName())).collect(Collectors.toList());
- if(schema.size() > 0)
- {
- return Stream.of((schema.get(0)).listFiles()).map(a -> a.getName()).collect(Collectors.toSet());
- }
- else
- return ImmutableSet.of();
- }
-
- public CarbonTable getTable(SchemaTableName schemaTableName) {
- try {
- updateSchemaTables();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
-
- requireNonNull(schemaTableName, "schemaTableName is null");
- CarbonTable table = loadTableMetadata(schemaTableName);
-
- return table;
- }
-
-
- public void updateSchemaTables()
- {
- //update logic determine later
- if(dbStore == null)
- {
- updateSchemaList();
- }
-
- tableList = new LinkedList<>();
- for(CarbonFile db: dbStore.listFiles())
- {
- if(!db.getName().endsWith(".mdt")) {
- for (CarbonFile table : db.listFiles()) {
- tableList.add(new SchemaTableName(db.getName(), table.getName()));
- }
- }
- }
- }
-
- private CarbonTable loadTableMetadata(SchemaTableName schemaTableName)
- {
- for (SchemaTableName table : tableList) {
- if (!table.equals(schemaTableName))
- continue;
-
- return parseCarbonMetadata(table);
- }
- return null;
- }
-
- /**
- * parse carbon metadata into cc(CarbonTableReader cache)
- **/
- public CarbonTable parseCarbonMetadata(SchemaTableName table)
- {
- CarbonTable result = null;
- try {
- //\u8fd9\u4e2a\u5e94\u8be5\u653e\u5728StoreFactory
- CarbonTableCacheModel cache = cc.getOrDefault(table, new CarbonTableCacheModel());
- if(cache.isValid())
- return cache.carbonTable;
-
- //Step1: get table meta path, load carbon table param
- String storePath = config.getStorePath();
- cache.carbonTableIdentifier = new CarbonTableIdentifier(table.getSchemaName(), table.getTableName(), UUID.randomUUID().toString());
- cache.carbonTablePath = PathFactory.getInstance().getCarbonTablePath(storePath, cache.carbonTableIdentifier);
- cc.put(table, cache);
-
- //Step2: check file existed? read schema file
- ThriftReader.TBaseCreator createTBase = new ThriftReader.TBaseCreator() {
- public TBase create() {
- return new org.apache.carbondata.format.TableInfo();
- }
- };
- ThriftReader thriftReader =
- new ThriftReader(cache.carbonTablePath.getSchemaFilePath(), createTBase);
- thriftReader.open();
- org.apache.carbondata.format.TableInfo tableInfo =
- (org.apache.carbondata.format.TableInfo) thriftReader.read();
- thriftReader.close();
-
- //Format Level\u7684TableInfo\uff0c \u9700\u8981\u8f6c\u6362\u6210Code Level\u7684TableInfo
- SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
- TableInfo wrapperTableInfo = schemaConverter
- .fromExternalToWrapperTableInfo(tableInfo, table.getSchemaName(), table.getTableName(),
- storePath);
- wrapperTableInfo.setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(cache.carbonTablePath.getSchemaFilePath()));
- //\u52a0\u8f7d\u5230CarbonMetadata\u4ed3\u5e93
- CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo);
-
- cache.tableInfo = wrapperTableInfo;
- cache.carbonTable = CarbonMetadata.getInstance().getCarbonTable(cache.carbonTableIdentifier.getTableUniqueName());
- result = cache.carbonTable;
- }catch (Exception ex) {
- throw new RuntimeException(ex);
- }
-
- return result;
- }
-
- public List<CarbonLocalInputSplit> getInputSplits2(CarbonTableCacheModel tableCacheModel, Expression filters) throws Exception {
-
- //\u5904\u7406filter, \u4e0b\u63a8filter\uff0c\u5c06\u5e94\u7528\u5728Segment\u7684\u7d22\u5f15\u4e0a
- FilterExpressionProcessor filterExpressionProcessor = new FilterExpressionProcessor();
-
- AbsoluteTableIdentifier absoluteTableIdentifier = tableCacheModel.carbonTable.getAbsoluteTableIdentifier();
- CacheClient cacheClient = new CacheClient(absoluteTableIdentifier.getStorePath());
- List<String> invalidSegments = new ArrayList<>();
- List<UpdateVO> invalidTimestampsList = new ArrayList<>();
-
- // get all valid segments and set them into the configuration
- SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(absoluteTableIdentifier);
- SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
- SegmentStatusManager.ValidAndInvalidSegmentsInfo segments = segmentStatusManager.getValidAndInvalidSegments();
-
- tableCacheModel.segments = segments.getValidSegments().toArray(new String[0]);
- if (segments.getValidSegments().size() == 0) {
- return new ArrayList<>(0);
- }
-
- // remove entry in the segment index if there are invalid segments
- invalidSegments.addAll(segments.getInvalidSegments());
- for (String invalidSegmentId : invalidSegments) {
- invalidTimestampsList.add(updateStatusManager.getInvalidTimestampRange(invalidSegmentId));
- }
- if (invalidSegments.size() > 0) {
- List<TableSegmentUniqueIdentifier> invalidSegmentsIds = new ArrayList<>(invalidSegments.size());
- for(String segId: invalidSegments) {
- invalidSegmentsIds.add(new TableSegmentUniqueIdentifier(absoluteTableIdentifier, segId));
- }
- cacheClient.getSegmentAccessClient().invalidateAll(invalidSegmentsIds);
- }
-
- // get filter for segment
- CarbonInputFormatUtil.processFilterExpression(filters, tableCacheModel.carbonTable);
- FilterResolverIntf filterInterface = CarbonInputFormatUtil.resolveFilter(filters, tableCacheModel.carbonTable.getAbsoluteTableIdentifier());
-
- List<CarbonLocalInputSplit> result = new ArrayList<>();
- //for each segment fetch blocks matching filter in Driver BTree
- for (String segmentNo : tableCacheModel.segments) {
- try{
- List<DataRefNode> dataRefNodes = getDataBlocksOfSegment(filterExpressionProcessor, absoluteTableIdentifier,tableCacheModel.carbonTablePath, filterInterface, segmentNo, cacheClient, updateStatusManager);
- for (DataRefNode dataRefNode : dataRefNodes) {
- BlockBTreeLeafNode leafNode = (BlockBTreeLeafNode) dataRefNode;
- TableBlockInfo tableBlockInfo = leafNode.getTableBlockInfo();
-
- if (CarbonUtil.isInvalidTableBlock(tableBlockInfo, updateStatusManager.getInvalidTimestampRange(tableBlockInfo.getSegmentId()), updateStatusManager)) {
- continue;
- }
- result.add(new CarbonLocalInputSplit(segmentNo, tableBlockInfo.getFilePath(),
- tableBlockInfo.getBlockOffset(), tableBlockInfo.getBlockLength(),
- Arrays.asList(tableBlockInfo.getLocations()), tableBlockInfo.getBlockletInfos().getNoOfBlockLets(),
- tableBlockInfo.getVersion().number()));
- }
- }catch (Exception ex){
- throw new RuntimeException(ex);
- }
- }
- cacheClient.close();
- return result;
- }
-
- /**
- * get data blocks of given segment
- */
- private List<DataRefNode> getDataBlocksOfSegment(FilterExpressionProcessor filterExpressionProcessor,
- AbsoluteTableIdentifier absoluteTableIdentifier,
- CarbonTablePath tablePath,
- FilterResolverIntf resolver,
- String segmentId,
- CacheClient cacheClient,
- SegmentUpdateStatusManager updateStatusManager) throws IOException {
- //DriverQueryStatisticsRecorder recorder = CarbonTimeStatisticsFactory.getQueryStatisticsRecorderInstance();
- //QueryStatistic statistic = new QueryStatistic();
-
- //\u8bfb\u53d6Segment \u5185\u90e8\u7684Index
- Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> segmentIndexMap =
- getSegmentAbstractIndexs(absoluteTableIdentifier, tablePath, segmentId, cacheClient,
- updateStatusManager);
-
- List<DataRefNode> resultFilterredBlocks = new LinkedList<DataRefNode>();
-
- if (null != segmentIndexMap) {
- // build result
- for (AbstractIndex abstractIndex : segmentIndexMap.values()) {
- List<DataRefNode> filterredBlocks;
- // if no filter is given get all blocks from Btree Index
- if (null == resolver) {
- filterredBlocks = getDataBlocksOfIndex(abstractIndex);
- } else {
- //ignore filter
- //filterredBlocks = getDataBlocksOfIndex(abstractIndex);
-
- // apply filter and get matching blocks
- filterredBlocks = filterExpressionProcessor
- .getFilterredBlocks(abstractIndex.getDataRefNode(), resolver, abstractIndex,
- absoluteTableIdentifier);
- }
- resultFilterredBlocks.addAll(filterredBlocks);
- }
- }
- //statistic.addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_DRIVER, System.currentTimeMillis());
- //recorder.recordStatisticsForDriver(statistic, "123456"/*job.getConfiguration().get("query.id")*/);
- return resultFilterredBlocks;
- }
-
- private boolean isSegmentUpdate(SegmentTaskIndexWrapper segmentTaskIndexWrapper,
- UpdateVO updateDetails) {
- if (null != updateDetails.getLatestUpdateTimestamp()
- && updateDetails.getLatestUpdateTimestamp() > segmentTaskIndexWrapper
- .getRefreshedTimeStamp()) {
- return true;
- }
- return false;
- }
-
- private Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> getSegmentAbstractIndexs(/*JobContext job,*/
- AbsoluteTableIdentifier absoluteTableIdentifier,
- CarbonTablePath tablePath,
- String segmentId,
- CacheClient cacheClient,
- SegmentUpdateStatusManager updateStatusManager) throws IOException {
- Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> segmentIndexMap = null;
- SegmentTaskIndexWrapper segmentTaskIndexWrapper = null;
- boolean isSegmentUpdated = false;
- Set<SegmentTaskIndexStore.TaskBucketHolder> taskKeys = null;
- TableSegmentUniqueIdentifier tableSegmentUniqueIdentifier =
- new TableSegmentUniqueIdentifier(absoluteTableIdentifier, segmentId);
- segmentTaskIndexWrapper =
- cacheClient.getSegmentAccessClient().getIfPresent(tableSegmentUniqueIdentifier);
- UpdateVO updateDetails = updateStatusManager.getInvalidTimestampRange(segmentId);
- if (null != segmentTaskIndexWrapper) {
- segmentIndexMap = segmentTaskIndexWrapper.getTaskIdToTableSegmentMap();
- if (isSegmentUpdate(segmentTaskIndexWrapper, updateDetails)) {
- taskKeys = segmentIndexMap.keySet();
- isSegmentUpdated = true;
- }
- }
-
- // if segment tree is not loaded, load the segment tree
- if (segmentIndexMap == null || isSegmentUpdated) {
-
- List<FileStatus> fileStatusList = new LinkedList<FileStatus>();
- List<String> segs = new ArrayList<>();
- segs.add(segmentId);
-
- FileSystem fs = getFileStatusOfSegments(new String[]{segmentId}, tablePath, fileStatusList);
- List<InputSplit> splits = getSplit(fileStatusList, fs);
-
- List<FileSplit> carbonSplits = new ArrayList<>();
- for (InputSplit inputSplit : splits) {
- FileSplit fileSplit = (FileSplit) inputSplit;
- String segId = CarbonTablePath.DataPathUtil.getSegmentId(fileSplit.getPath().toString());//\u8fd9\u91cc\u7684seperator\u5e94\u8be5\u600e\u4e48\u52a0\uff1f\uff1f
- if (segId.equals(CarbonCommonConstants.INVALID_SEGMENT_ID)) {
- continue;
- }
- carbonSplits.add(fileSplit);
- }
-
- List<TableBlockInfo> tableBlockInfoList = new ArrayList<>();
- for (FileSplit inputSplit : carbonSplits) {
- if (isValidBlockBasedOnUpdateDetails(taskKeys, inputSplit, updateDetails, updateStatusManager, segmentId)) {
-
- BlockletInfos blockletInfos = new BlockletInfos(0, 0, 0);//this level we do not need blocklet info!!!! Is this a trick?
- tableBlockInfoList.add(
- new TableBlockInfo(inputSplit.getPath().toString(),
- inputSplit.getStart(),
- segmentId,
- inputSplit.getLocations(),
- inputSplit.getLength(),
- blockletInfos,
- ColumnarFormatVersion.valueOf(CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION),
- null/*new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE)*/));//\u8fd9\u91cc\u7684null\u662f\u5426\u4f1a\u5f02\u5e38\uff1f
- }
- }
-
- Map<String, List<TableBlockInfo>> segmentToTableBlocksInfos = new HashMap<>();
- segmentToTableBlocksInfos.put(segmentId, tableBlockInfoList);
- // get Btree blocks for given segment
- tableSegmentUniqueIdentifier.setSegmentToTableBlocksInfos(segmentToTableBlocksInfos);
- tableSegmentUniqueIdentifier.setIsSegmentUpdated(isSegmentUpdated);
- segmentTaskIndexWrapper =
- cacheClient.getSegmentAccessClient().get(tableSegmentUniqueIdentifier);
- segmentIndexMap = segmentTaskIndexWrapper.getTaskIdToTableSegmentMap();
- }
- return segmentIndexMap;
- }
-
- private boolean isValidBlockBasedOnUpdateDetails(
- Set<SegmentTaskIndexStore.TaskBucketHolder> taskKeys, FileSplit carbonInputSplit,
- UpdateVO updateDetails, SegmentUpdateStatusManager updateStatusManager, String segmentId) {
- String taskID = null;
- if (null != carbonInputSplit) {
- if (!updateStatusManager.isBlockValid(segmentId, carbonInputSplit.getPath().getName())) {
- return false;
- }
-
- if (null == taskKeys) {
- return true;
- }
-
- taskID = CarbonTablePath.DataFileUtil.getTaskNo(carbonInputSplit.getPath().getName());
- String bucketNo =
- CarbonTablePath.DataFileUtil.getBucketNo(carbonInputSplit.getPath().getName());
-
- SegmentTaskIndexStore.TaskBucketHolder taskBucketHolder =
- new SegmentTaskIndexStore.TaskBucketHolder(taskID, bucketNo);
-
- String blockTimestamp = carbonInputSplit.getPath().getName()
- .substring(carbonInputSplit.getPath().getName().lastIndexOf('-') + 1,
- carbonInputSplit.getPath().getName().lastIndexOf('.'));
- if (!(updateDetails.getUpdateDeltaStartTimestamp() != null
- && Long.parseLong(blockTimestamp) < updateDetails.getUpdateDeltaStartTimestamp())) {
- if (!taskKeys.contains(taskBucketHolder)) {
- return true;
- }
- }
- }
- return false;
- }
-
- private List<InputSplit> getSplit(List<FileStatus> fileStatusList, FileSystem targetSystem) throws IOException {
-
- Iterator split = fileStatusList.iterator();
-
- List<InputSplit> splits = new ArrayList<>();
-
- while (true)
- {
- while (true)
- {
- while(split.hasNext()) {
- FileStatus file = (FileStatus) split.next();
- Path path = file.getPath();
- long length = file.getLen();
- if (length != 0L) {
- BlockLocation[] blkLocations;
- if (file instanceof LocatedFileStatus) {
- blkLocations = ((LocatedFileStatus) file).getBlockLocations();
- } else {
- blkLocations = targetSystem.getFileBlockLocations(file, 0L, length);
- }
-
- if (this.isSplitable()) {
- long blockSize1 = file.getBlockSize();
- long splitSize = this.computeSplitSize(blockSize1, 1, Long.MAX_VALUE);
-
- long bytesRemaining;
- int blkIndex;
- for (bytesRemaining = length; (double) bytesRemaining / (double) splitSize > 1.1D; bytesRemaining -= splitSize) {
- blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining);
- splits.add(this.makeSplit(path, length - bytesRemaining, splitSize, blkLocations[blkIndex].getHosts()));
- }
-
- if (bytesRemaining != 0L) {
- blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining);
- splits.add(this.makeSplit(path, length - bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts()));
- }
- }
- else
- {
- splits.add(new org.apache.hadoop.mapreduce.lib.input.FileSplit(path, 0L, length, blkLocations[0].getHosts()));
- }
- }
- else {
- splits.add(new org.apache.hadoop.mapreduce.lib.input.FileSplit(path, 0L, length, new String[0]));
- }
- }
- return splits;
- }
- }
-
- }
-
- private String[] getValidPartitions() {
- //TODO: has to Identify partitions by partition pruning
- return new String[] { "0" };
- }
-
- private FileSystem getFileStatusOfSegments(String[] segmentsToConsider,
- CarbonTablePath tablePath,
- List<FileStatus> result) throws IOException {
- String[] partitionsToConsider = getValidPartitions();
- if (partitionsToConsider.length == 0) {
- throw new IOException("No partitions/data found");
- }
-
- FileSystem fs = null;
-
- //PathFilter inputFilter = getDataFileFilter(job);
-
- // get tokens for all the required FileSystem for table path
- /*TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { tablePath },
- job.getConfiguration());*/
-
- //get all data files of valid partitions and segments
- for (int i = 0; i < partitionsToConsider.length; ++i) {
- String partition = partitionsToConsider[i];
-
- for (int j = 0; j < segmentsToConsider.length; ++j) {
- String segmentId = segmentsToConsider[j];
- Path segmentPath = new Path(tablePath.getCarbonDataDirectoryPath(partition, segmentId));
-
- try{
- Configuration conf = new Configuration();
- fs = segmentPath.getFileSystem(conf);
- //fs.initialize(segmentPath.toUri(), conf);
-
- RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(segmentPath);
- while (iter.hasNext()) {
- LocatedFileStatus stat = iter.next();
- //if(stat.getPath().toString().contains("carbondata"))//\u53c2\u770bcarbondata\u7684carbonInputFilter\u7684\u5b9e\u73b0
- if (DefaultFilter.accept(stat.getPath()))
- {
- if (stat.isDirectory()) {
- addInputPathRecursively(result, fs, stat.getPath(), DefaultFilter);
- } else {
- result.add(stat);
- }
- }
- }
- }catch (Exception ex){
- System.out.println(ex.toString());
- }
- }
- }
- return fs;
- }
-
- protected void addInputPathRecursively(List<FileStatus> result, FileSystem fs, Path path, PathFilter inputFilter) throws IOException {
- RemoteIterator iter = fs.listLocatedStatus(path);
-
- while(iter.hasNext()) {
- LocatedFileStatus stat = (LocatedFileStatus)iter.next();
- if(inputFilter.accept(stat.getPath())) {
- if(stat.isDirectory()) {
- this.addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
- } else {
- result.add(stat);
- }
- }
- }
-
- }
-
- /**
- * get data blocks of given btree
- */
- private List<DataRefNode> getDataBlocksOfIndex(AbstractIndex abstractIndex) {
- List<DataRefNode> blocks = new LinkedList<DataRefNode>();
- SegmentProperties segmentProperties = abstractIndex.getSegmentProperties();
-
- try {
- IndexKey startIndexKey = FilterUtil.prepareDefaultStartIndexKey(segmentProperties);
- IndexKey endIndexKey = FilterUtil.prepareDefaultEndIndexKey(segmentProperties);
-
- // Add all blocks of btree into result
- DataRefNodeFinder blockFinder =
- new BTreeDataRefNodeFinder(segmentProperties.getEachDimColumnValueSize());
- DataRefNode startBlock =
- blockFinder.findFirstDataBlock(abstractIndex.getDataRefNode(), startIndexKey);
- DataRefNode endBlock =
- blockFinder.findLastDataBlock(abstractIndex.getDataRefNode(), endIndexKey);
- while (startBlock != endBlock) {
- blocks.add(startBlock);
- startBlock = startBlock.getNextDataRefNode();
- }
- blocks.add(endBlock);
-
- } catch (KeyGenException e) {
- System.out.println("Could not generate start key" + e.getMessage());
- }
- return blocks;
- }
-
- private boolean isSplitable() {
- try {
- // Don't split the file if it is local file system
- if(this.fileType == FileFactory.FileType.LOCAL)
- {
- return false;
- }
- } catch (Exception e) {
- return true;
- }
- return true;
- }
-
- private long computeSplitSize(long blockSize, long minSize,
- long maxSize) {
- return Math.max(minSize, Math.min(maxSize, blockSize));
- }
-
- private FileSplit makeSplit(Path file, long start, long length,
- String[] hosts) {
- return new FileSplit(file, start, length, hosts);
- }
-
- private int getBlockIndex(BlockLocation[] blkLocations,
- long offset) {
- for (int i = 0 ; i < blkLocations.length; i++) {
- // is the offset inside this block?
- if ((blkLocations[i].getOffset() <= offset) &&
- (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){
- return i;
- }
- }
- BlockLocation last = blkLocations[blkLocations.length -1];
- long fileLength = last.getOffset() + last.getLength() -1;
- throw new IllegalArgumentException("Offset " + offset +
- " is outside of file (0.." +
- fileLength + ")");
- }
-
-
- /**
- * get total number of rows. for count(*)
- *
- * @throws IOException
- * @throws IndexBuilderException
- */
- public long getRowCount() throws IOException, IndexBuilderException {
- long rowCount = 0;
- /*AbsoluteTableIdentifier absoluteTableIdentifier = this.carbonTable.getAbsoluteTableIdentifier();
-
- // no of core to load the blocks in driver
- //addSegmentsIfEmpty(job, absoluteTableIdentifier);
- int numberOfCores = CarbonCommonConstants.NUMBER_OF_CORE_TO_LOAD_DRIVER_SEGMENT_DEFAULT_VALUE;
- try {
- numberOfCores = Integer.parseInt(CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.NUMBER_OF_CORE_TO_LOAD_DRIVER_SEGMENT));
- } catch (NumberFormatException e) {
- numberOfCores = CarbonCommonConstants.NUMBER_OF_CORE_TO_LOAD_DRIVER_SEGMENT_DEFAULT_VALUE;
- }
- // creating a thread pool
- ExecutorService threadPool = Executors.newFixedThreadPool(numberOfCores);
- List<Future<Map<String, AbstractIndex>>> loadedBlocks =
- new ArrayList<Future<Map<String, AbstractIndex>>>();
- //for each segment fetch blocks matching filter in Driver BTree
- for (String segmentNo : this.segmentList) {
- // submitting the task
- loadedBlocks
- .add(threadPool.submit(new BlocksLoaderThread(*//*job,*//* absoluteTableIdentifier, segmentNo)));
- }
- threadPool.shutdown();
- try {
- threadPool.awaitTermination(1, TimeUnit.HOURS);
- } catch (InterruptedException e) {
- throw new IndexBuilderException(e);
- }
- try {
- // adding all the rows of the blocks to get the total row
- // count
- for (Future<Map<String, AbstractIndex>> block : loadedBlocks) {
- for (AbstractIndex abstractIndex : block.get().values()) {
- rowCount += abstractIndex.getTotalNumberOfRows();
- }
- }
- } catch (InterruptedException | ExecutionException e) {
- throw new IndexBuilderException(e);
- }*/
- return rowCount;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/32bf2961/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnConstraint.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnConstraint.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnConstraint.java
new file mode 100755
index 0000000..f2f69d9
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnConstraint.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import com.facebook.presto.spi.predicate.Domain;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonSetter;
+
+import java.util.Objects;
+import java.util.Optional;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+import static java.util.Objects.requireNonNull;
+
+public class CarbondataColumnConstraint {
+ private final String name;
+ private final boolean invertedindexed;
+ private Optional<Domain> domain;
+
+ @JsonCreator
+ public CarbondataColumnConstraint(
+ @JsonProperty("name") String name,
+ @JsonProperty("domain") Optional<Domain> domain,
+ @JsonProperty("invertedindexed") boolean invertedindexed)
+ {
+ this.name = requireNonNull(name, "name is null");
+ this.invertedindexed = requireNonNull(invertedindexed, "invertedIndexed is null");
+ this.domain = requireNonNull(domain, "domain is null");
+ }
+
+ @JsonProperty
+ public boolean isInvertedindexed()
+ {
+ return invertedindexed;
+ }
+
+ @JsonProperty
+ public String getName()
+ {
+ return name;
+ }
+
+ @JsonProperty
+ public Optional<Domain> getDomain()
+ {
+ return domain;
+ }
+
+ @JsonSetter
+ public void setDomain(Optional<Domain> domain)
+ {
+ this.domain = domain;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(name, domain, invertedindexed);
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (this == obj) {
+ return true;
+ }
+
+ if ((obj == null) || (getClass() != obj.getClass())) {
+ return false;
+ }
+
+ CarbondataColumnConstraint other = (CarbondataColumnConstraint) obj;
+ return Objects.equals(this.name, other.name)
+ && Objects.equals(this.domain, other.domain)
+ && Objects.equals(this.invertedindexed, other.invertedindexed);
+ }
+
+ @Override
+ public String toString()
+ {
+ return toStringHelper(this)
+ .add("name", this.name)
+ .add("invertedindexed", this.invertedindexed)
+ .add("domain", this.domain)
+ .toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/32bf2961/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java
new file mode 100755
index 0000000..252556a
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import com.facebook.presto.spi.ColumnHandle;
+import com.facebook.presto.spi.ColumnMetadata;
+import com.facebook.presto.spi.type.Type;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+import static java.util.Objects.requireNonNull;
+
+public class CarbondataColumnHandle
+ implements ColumnHandle
+{
+ private final String connectorId;
+ private final String columnName;
+
+ public boolean isInvertedIndex() {
+ return isInvertedIndex;
+ }
+
+ private final Type columnType;
+ private final int ordinalPosition;
+ private final int keyOrdinal;
+ private final int columnGroupOrdinal;
+
+ private final int columnGroupId;
+ private final String columnUniqueId;
+ private final boolean isInvertedIndex;
+
+ public boolean isMeasure() {
+ return isMeasure;
+ }
+
+ private final boolean isMeasure;
+
+ public int getKeyOrdinal() {
+ return keyOrdinal;
+ }
+
+ public int getColumnGroupOrdinal() {
+ return columnGroupOrdinal;
+ }
+
+ public int getColumnGroupId() {
+ return columnGroupId;
+ }
+
+ public String getColumnUniqueId() {
+ return columnUniqueId;
+ }
+ /* ordinalPosition of a columnhandle is the -> number of the column in the entire list of columns of this table
+ IT DOESNT DEPEND ON THE QUERY (select clm3, clm0, clm1 from tablename)
+ The columnhandle of clm3 : has ordinalposition = 3
+ */
+
+ @JsonCreator
+ public CarbondataColumnHandle(
+ @JsonProperty("connectorId") String connectorId,
+ @JsonProperty("columnName") String columnName,
+ @JsonProperty("columnType") Type columnType,
+ @JsonProperty("ordinalPosition") int ordinalPosition,
+ @JsonProperty("keyOrdinal") int keyOrdinal,
+ @JsonProperty("columnGroupOrdinal") int columnGroupOrdinal,
+ @JsonProperty("isMeasure") boolean isMeasure,
+ @JsonProperty("columnGroupId") int columnGroupId,
+ @JsonProperty("columnUniqueId") String columnUniqueId,
+ @JsonProperty("isInvertedIndex") boolean isInvertedIndex)
+ {
+ this.connectorId = requireNonNull(connectorId, "connectorId is null");
+ this.columnName = requireNonNull(columnName, "columnName is null");
+ this.columnType = requireNonNull(columnType, "columnType is null");
+
+ this.ordinalPosition = requireNonNull(ordinalPosition, "ordinalPosition is null");
+ this.keyOrdinal = requireNonNull(keyOrdinal, "keyOrdinal is null");
+ this.columnGroupOrdinal = requireNonNull(columnGroupOrdinal, "columnGroupOrdinal is null");
+
+ this.isMeasure = isMeasure;
+ this.columnGroupId = requireNonNull(columnGroupId, "columnGroupId is null");
+ this.columnUniqueId = columnUniqueId;//requireNonNull(columnUniqueId, "columnUniqueId is null");
+ this.isInvertedIndex = requireNonNull(isInvertedIndex, "isInvertedIndex is null");
+ }
+
+ @JsonProperty
+ public String getConnectorId()
+ {
+ return connectorId;
+ }
+
+ @JsonProperty
+ public String getColumnName()
+ {
+ return columnName;
+ }
+
+ @JsonProperty
+ public Type getColumnType()
+ {
+ return columnType;
+ }
+
+ @JsonProperty
+ public int getOrdinalPosition()
+ {
+ return ordinalPosition;
+ }
+
+ public ColumnMetadata getColumnMetadata()
+ {
+ return new ColumnMetadata(columnName, columnType, null, false);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(connectorId, columnName);
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (this == obj) {
+ return true;
+ }
+ if ((obj == null) || (getClass() != obj.getClass())) {
+ return false;
+ }
+
+ CarbondataColumnHandle other = (CarbondataColumnHandle) obj;
+ return Objects.equals(this.connectorId, other.connectorId) &&
+ Objects.equals(this.columnName, other.columnName);
+ }
+
+ @Override
+ public String toString()
+ {
+ return toStringHelper(this)
+ .add("connectorId", connectorId)
+ .add("columnName", columnName)
+ .add("columnType", columnType)
+ .add("ordinalPosition", ordinalPosition)
+ .toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/32bf2961/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnector.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnector.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnector.java
new file mode 100755
index 0000000..90b4944
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnector.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import com.facebook.presto.spi.connector.*;
+import com.facebook.presto.spi.transaction.IsolationLevel;
+import io.airlift.bootstrap.LifeCycleManager;
+import io.airlift.log.Logger;
+
+import static com.facebook.presto.spi.transaction.IsolationLevel.READ_COMMITTED;
+import static com.facebook.presto.spi.transaction.IsolationLevel.checkConnectorSupports;
+import static java.util.Objects.requireNonNull;
+
+public class CarbondataConnector
+ implements Connector
+{
+
+ private static final Logger log = Logger.get(CarbondataConnector.class);
+
+ private final LifeCycleManager lifeCycleManager;
+ private final CarbondataMetadata metadata;
+ private final ConnectorSplitManager splitManager;
+ private final ConnectorRecordSetProvider recordSetProvider;
+ private final ClassLoader classLoader;
+
+
+ public CarbondataConnector(LifeCycleManager lifeCycleManager,
+ CarbondataMetadata metadata,
+ ConnectorSplitManager splitManager,
+ ConnectorRecordSetProvider recordSetProvider,
+ ClassLoader classLoader)
+ {
+ this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null");
+ this.metadata = requireNonNull(metadata, "metadata is null");
+ this.splitManager = requireNonNull(splitManager, "splitManager is null");
+ this.recordSetProvider = requireNonNull(recordSetProvider, "recordSetProvider is null");
+ this.classLoader = requireNonNull(classLoader, "classLoader is null");
+ }
+
+ @Override
+ public ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel, boolean readOnly) {
+ checkConnectorSupports(READ_COMMITTED, isolationLevel);
+ return CarbondataTransactionHandle.INSTANCE;
+ }
+
+ @Override
+ public ConnectorMetadata getMetadata(ConnectorTransactionHandle transactionHandle) {
+ metadata.putClassLoader(classLoader);
+ return metadata;
+ }
+
+ @Override
+ public ConnectorSplitManager getSplitManager() {
+ return splitManager;
+ }
+
+ @Override
+ public ConnectorRecordSetProvider getRecordSetProvider()
+ {
+ return recordSetProvider;
+ }
+
+ @Override
+ public final void shutdown()
+ {
+ try {
+ lifeCycleManager.stop();
+ }
+ catch (Exception e) {
+ log.error(e, "Error shutting down connector");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/32bf2961/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
new file mode 100755
index 0000000..324699c
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import com.facebook.presto.spi.ConnectorHandleResolver;
+import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
+import com.facebook.presto.spi.connector.*;
+import com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorSplitManager;
+import com.google.common.base.Throwables;
+import com.google.inject.Injector;
+import io.airlift.bootstrap.Bootstrap;
+import io.airlift.bootstrap.LifeCycleManager;
+import io.airlift.json.JsonModule;
+
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+public class CarbondataConnectorFactory
+ implements ConnectorFactory {
+
+ private final String name;
+ private final ClassLoader classLoader;
+
+ public CarbondataConnectorFactory(String connectorName, ClassLoader classLoader){
+ this.name = connectorName;
+ this.classLoader = requireNonNull(classLoader, "classLoader is null");
+ }
+
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public ConnectorHandleResolver getHandleResolver() {
+ return new CarbondataHandleResolver();
+ }
+
+ @Override
+ public Connector create(String connectorId, Map<String, String> config, ConnectorContext context) {
+ requireNonNull(config, "config is null");
+
+ try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
+ Bootstrap app = new Bootstrap(new JsonModule(), new CarbondataModule(connectorId, context.getTypeManager()));
+
+ Injector injector = app
+ .strictConfig()
+ .doNotInitializeLogging()
+ .setRequiredConfigurationProperties(config)
+ .initialize();
+
+ LifeCycleManager lifeCycleManager = injector.getInstance(LifeCycleManager.class);
+ CarbondataMetadata metadata = injector.getInstance(CarbondataMetadata.class);
+ //HiveTransactionManager transactionManager = injector.getInstance(HiveTransactionManager.class);
+ ConnectorSplitManager splitManager = injector.getInstance(ConnectorSplitManager.class);
+ ConnectorRecordSetProvider connectorRecordSet = injector.getInstance(ConnectorRecordSetProvider.class);
+ //ConnectorAccessControl accessControl = injector.getInstance(ConnectorAccessControl.class);
+
+
+ //ConnectorPageSourceProvider connectorPageSource = injector.getInstance(ConnectorPageSourceProvider.class);
+ //ConnectorPageSinkProvider pageSinkProvider = injector.getInstance(ConnectorPageSinkProvider.class);
+ //ConnectorNodePartitioningProvider connectorDistributionProvider = injector.getInstance(ConnectorNodePartitioningProvider.class);
+ //HiveSessionProperties hiveSessionProperties = injector.getInstance(HiveSessionProperties.class);
+ //HiveTableProperties hiveTableProperties = injector.getInstance(HiveTableProperties.class);
+
+
+ return new CarbondataConnector(
+ lifeCycleManager,
+ metadata,
+ new ClassLoaderSafeConnectorSplitManager(splitManager, classLoader),
+ connectorRecordSet,//new ClassLoaderSafeConnectorRecordSetProvider(, classLoader),
+ classLoader
+ //new ClassLoaderSafeConnectorPageSourceProvider(connectorPageSource, classLoader),
+ //new ClassLoaderSafeConnectorPageSinkProvider(pageSinkProvider, classLoader),
+ //new ClassLoaderSafeNodePartitioningProvider(connectorDistributionProvider, classLoader),
+ );
+ }
+ catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/32bf2961/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorId.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorId.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorId.java
new file mode 100755
index 0000000..5aa72f1
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorId.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import com.google.inject.Inject;
+
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
+
+public class CarbondataConnectorId
+{
+ private final String id;
+
+ @Inject
+ public CarbondataConnectorId(String id)
+ {
+ this.id = requireNonNull(id, "id is null");
+ }
+
+ @Override
+ public String toString()
+ {
+ return id;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(id);
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (this == obj) {
+ return true;
+ }
+
+ if ((obj == null) || (getClass() != obj.getClass())) {
+ return false;
+ }
+
+ return Objects.equals(this.id, ((CarbondataConnectorId) obj).id);
+ }
+}
\ No newline at end of file