You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2017/05/24 10:49:08 UTC

[5/5] carbondata git commit: resolved rebase conflict for presto conflict

resolved rebase conflict for presto conflict


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/5f6f1a5c
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/5f6f1a5c
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/5f6f1a5c

Branch: refs/heads/master
Commit: 5f6f1a5c443face7a3abcca836cc33a805d0dba7
Parents: 0988a84
Author: chenliang613 <ch...@apache.org>
Authored: Wed May 24 18:43:01 2017 +0800
Committer: chenliang613 <ch...@apache.org>
Committed: Wed May 24 18:43:01 2017 +0800

----------------------------------------------------------------------
 .../presto/CarbondataColumnHandle.java          |  25 ++-
 .../carbondata/presto/CarbondataConnector.java  |  83 +++++++++
 .../presto/CarbondataConnectorFactory.java      |   5 +-
 .../carbondata/presto/CarbondataMetadata.java   |  23 +--
 .../carbondata/presto/CarbondataModule.java     |  81 +++++++++
 .../carbondata/presto/CarbondataPageSource.java | 178 +++++++++++++++++++
 .../presto/CarbondataPageSourceProvider.java    |  50 ++++++
 .../presto/CarbondataRecordCursor.java          |  60 ++++++-
 .../carbondata/presto/CarbondataRecordSet.java  |   8 +-
 .../presto/CarbondataRecordSetProvider.java     |  12 +-
 10 files changed, 499 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f6f1a5c/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java
index b9152b5..4a9b7ed 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java
@@ -46,6 +46,14 @@ public class CarbondataColumnHandle implements ColumnHandle {
   private final String columnUniqueId;
   private final boolean isInvertedIndex;
 
+  /**
+   * Used when this column contains decimal data.
+   */
+  private int scale;
+
+  private int precision;
+
+
   public boolean isMeasure() {
     return isMeasure;
   }
@@ -76,7 +84,9 @@ public class CarbondataColumnHandle implements ColumnHandle {
       @JsonProperty("isMeasure") boolean isMeasure,
       @JsonProperty("columnGroupId") int columnGroupId,
       @JsonProperty("columnUniqueId") String columnUniqueId,
-      @JsonProperty("isInvertedIndex") boolean isInvertedIndex) {
+      @JsonProperty("isInvertedIndex") boolean isInvertedIndex,
+      @JsonProperty("precision") int precision,
+      @JsonProperty("scale") int scale) {
     this.connectorId = requireNonNull(connectorId, "connectorId is null");
     this.columnName = requireNonNull(columnName, "columnName is null");
     this.columnType = requireNonNull(columnType, "columnType is null");
@@ -89,6 +99,8 @@ public class CarbondataColumnHandle implements ColumnHandle {
     this.columnGroupId = requireNonNull(columnGroupId, "columnGroupId is null");
     this.columnUniqueId = columnUniqueId;//requireNonNull(columnUniqueId, "columnUniqueId is null");
     this.isInvertedIndex = requireNonNull(isInvertedIndex, "isInvertedIndex is null");
+    this.precision = precision;
+    this.scale = scale;
   }
 
   @JsonProperty public String getConnectorId() {
@@ -132,4 +144,15 @@ public class CarbondataColumnHandle implements ColumnHandle {
     return toStringHelper(this).add("connectorId", connectorId).add("columnName", columnName)
         .add("columnType", columnType).add("ordinalPosition", ordinalPosition).toString();
   }
+
+  @JsonProperty public int getScale() {
+    return scale;
+  }
+
+  @JsonProperty public int getPrecision() {
+    return precision;
+  }
+
+
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f6f1a5c/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnector.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnector.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnector.java
new file mode 100755
index 0000000..406ed93
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnector.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import com.facebook.presto.spi.connector.*;
+import com.facebook.presto.spi.transaction.IsolationLevel;
+import io.airlift.bootstrap.LifeCycleManager;
+import io.airlift.log.Logger;
+
+import static com.facebook.presto.spi.transaction.IsolationLevel.READ_COMMITTED;
+import static com.facebook.presto.spi.transaction.IsolationLevel.checkConnectorSupports;
+import static java.util.Objects.requireNonNull;
+
+public class CarbondataConnector implements Connector {
+
+  private static final Logger log = Logger.get(CarbondataConnector.class);
+
+  private final LifeCycleManager lifeCycleManager;
+  private final CarbondataMetadata metadata;
+  private final ConnectorSplitManager splitManager;
+  private final ConnectorRecordSetProvider recordSetProvider;
+  private final ClassLoader classLoader;
+  private final ConnectorPageSourceProvider pageSourceProvider;
+
+  public CarbondataConnector(LifeCycleManager lifeCycleManager, CarbondataMetadata metadata,
+      ConnectorSplitManager splitManager, ConnectorRecordSetProvider recordSetProvider,
+      ClassLoader classLoader, ConnectorPageSourceProvider pageSourceProvider) {
+    this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null");
+    this.metadata = requireNonNull(metadata, "metadata is null");
+    this.splitManager = requireNonNull(splitManager, "splitManager is null");
+    this.recordSetProvider = requireNonNull(recordSetProvider, "recordSetProvider is null");
+    this.classLoader = requireNonNull(classLoader, "classLoader is null");
+    this.pageSourceProvider = requireNonNull(pageSourceProvider, "pageSourceProvider is null");
+  }
+
+  @Override public ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel,
+      boolean readOnly) {
+    checkConnectorSupports(READ_COMMITTED, isolationLevel);
+    return CarbondataTransactionHandle.INSTANCE;
+  }
+
+  @Override public ConnectorMetadata getMetadata(ConnectorTransactionHandle transactionHandle) {
+    metadata.putClassLoader(classLoader);
+    return metadata;
+  }
+
+  @Override public ConnectorSplitManager getSplitManager() {
+    return splitManager;
+  }
+
+  @Override public ConnectorRecordSetProvider getRecordSetProvider() {
+    return recordSetProvider;
+  }
+
+  @Override
+  public ConnectorPageSourceProvider getPageSourceProvider()
+  {
+    return pageSourceProvider;
+  }
+
+  @Override public final void shutdown() {
+    try {
+      lifeCycleManager.stop();
+    } catch (Exception e) {
+      log.error(e, "Error shutting down connector");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f6f1a5c/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
index d1c8082..d97f19e 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
@@ -20,6 +20,7 @@ package org.apache.carbondata.presto;
 import com.facebook.presto.spi.ConnectorHandleResolver;
 import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
 import com.facebook.presto.spi.connector.*;
+import com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorPageSourceProvider;
 import com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorSplitManager;
 import com.google.common.base.Throwables;
 import com.google.inject.Injector;
@@ -70,10 +71,12 @@ public class CarbondataConnectorFactory implements ConnectorFactory {
       ConnectorSplitManager splitManager = injector.getInstance(ConnectorSplitManager.class);
       ConnectorRecordSetProvider connectorRecordSet =
           injector.getInstance(ConnectorRecordSetProvider.class);
+       ConnectorPageSourceProvider connectorPageSource = injector.getInstance(ConnectorPageSourceProvider.class);
 
       return new CarbondataConnector(lifeCycleManager, metadata,
           new ClassLoaderSafeConnectorSplitManager(splitManager, classLoader), connectorRecordSet,
-          classLoader
+          classLoader,
+          new ClassLoaderSafeConnectorPageSourceProvider(connectorPageSource, classLoader)
       );
     } catch (Exception e) {
       throw Throwables.propagate(e);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f6f1a5c/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
index f2d594a..7701490 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
@@ -123,9 +123,8 @@ public class CarbondataMetadata implements ConnectorMetadata {
     List<CarbonColumn> carbonColumns = carbonTable.getCreateOrderColumn(schemaTableName.getTableName());
     for (CarbonColumn col : carbonColumns) {
       //show columns command will return these data
-      Type columnType = CarbondataType2SpiMapper(col.getColumnSchema().getDataType());
-      ColumnMetadata columnMeta = new ColumnMetadata(col.getColumnSchema().getColumnName(),
-          columnType);
+      Type columnType = CarbondataType2SpiMapper(col.getColumnSchema());
+      ColumnMetadata columnMeta = new ColumnMetadata(col.getColumnSchema().getColumnName(), columnType);
       columnsMetaList.add(columnMeta);
     }
 
@@ -162,21 +161,21 @@ public class CarbondataMetadata implements ConnectorMetadata {
       column.getNumberOfChild();
       column.getListOfChildDimensions();
 
-      Type spiType = CarbondataType2SpiMapper(cs.getDataType());
+      Type spiType = CarbondataType2SpiMapper(cs);
       columnHandles.put(cs.getColumnName(),
           new CarbondataColumnHandle(connectorId, cs.getColumnName(), spiType, column.getSchemaOrdinal(),
               column.getKeyOrdinal(), column.getColumnGroupOrdinal(), false, cs.getColumnGroupId(),
-              cs.getColumnUniqueId(), cs.isUseInvertedIndex()));
+              cs.getColumnUniqueId(), cs.isUseInvertedIndex(), cs.getPrecision(), cs.getScale()));
     }
 
     for (CarbonMeasure measure : cb.getMeasureByTableName(tableName)) {
       ColumnSchema cs = measure.getColumnSchema();
 
-      Type spiType = CarbondataType2SpiMapper(cs.getDataType());
+      Type spiType = CarbondataType2SpiMapper(cs);
       columnHandles.put(cs.getColumnName(),
           new CarbondataColumnHandle(connectorId, cs.getColumnName(), spiType, cs.getSchemaOrdinal(),
               measure.getOrdinal(), cs.getColumnGroupId(), true, cs.getColumnGroupId(),
-              cs.getColumnUniqueId(), cs.isUseInvertedIndex()));
+              cs.getColumnUniqueId(), cs.isUseInvertedIndex(), cs.getPrecision(), cs.getScale()));
     }
 
     //should i cache it?
@@ -230,7 +229,8 @@ public class CarbondataMetadata implements ConnectorMetadata {
     return getTableMetadata(carbondataTableHandle.getSchemaTableName());
   }
 
-  public static Type CarbondataType2SpiMapper(DataType colType) {
+  public static Type CarbondataType2SpiMapper(ColumnSchema columnSchema) {
+    DataType colType = columnSchema.getDataType();
     switch (colType) {
       case BOOLEAN:
         return BooleanType.BOOLEAN;
@@ -243,9 +243,12 @@ public class CarbondataMetadata implements ConnectorMetadata {
       case FLOAT:
       case DOUBLE:
         return DoubleType.DOUBLE;
-
       case DECIMAL:
-        return DecimalType.createDecimalType();
+        if(columnSchema.getPrecision() > 0){
+          return DecimalType.createDecimalType(columnSchema.getPrecision(), columnSchema.getScale());
+        } else {
+          return DecimalType.createDecimalType();
+        }
       case STRING:
         return VarcharType.VARCHAR;
       case DATE:

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f6f1a5c/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataModule.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataModule.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataModule.java
new file mode 100755
index 0000000..1d8b2b2
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataModule.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import org.apache.carbondata.presto.impl.CarbonTableConfig;
+import org.apache.carbondata.presto.impl.CarbonTableReader;
+
+import com.facebook.presto.spi.connector.ConnectorPageSourceProvider;
+import com.facebook.presto.spi.connector.ConnectorRecordSetProvider;
+import com.facebook.presto.spi.connector.ConnectorSplitManager;
+import com.facebook.presto.spi.type.Type;
+import com.facebook.presto.spi.type.TypeManager;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.deser.std.FromStringDeserializer;
+import com.google.inject.Binder;
+import com.google.inject.Module;
+import com.google.inject.Scopes;
+
+import javax.inject.Inject;
+
+import static com.facebook.presto.spi.type.TypeSignature.parseTypeSignature;
+import static com.google.common.base.Preconditions.checkArgument;
+import static io.airlift.configuration.ConfigBinder.configBinder;
+import static java.util.Objects.requireNonNull;
+
+public class CarbondataModule implements Module {
+
+  private final String connectorId;
+  private final TypeManager typeManager;
+
+  public CarbondataModule(String connectorId, TypeManager typeManager) {
+    this.connectorId = requireNonNull(connectorId, "connector id is null");
+    this.typeManager = requireNonNull(typeManager, "typeManager is null");
+  }
+
+  @Override public void configure(Binder binder) {
+    binder.bind(TypeManager.class).toInstance(typeManager);
+
+    binder.bind(CarbondataConnectorId.class).toInstance(new CarbondataConnectorId(connectorId));
+    binder.bind(CarbondataMetadata.class).in(Scopes.SINGLETON);
+    binder.bind(CarbonTableReader.class).in(Scopes.SINGLETON);
+    binder.bind(ConnectorSplitManager.class).to(CarbondataSplitManager.class).in(Scopes.SINGLETON);
+    binder.bind(ConnectorRecordSetProvider.class).to(CarbondataRecordSetProvider.class)
+        .in(Scopes.SINGLETON);
+    binder.bind(ConnectorPageSourceProvider.class).to(CarbondataPageSourceProvider.class)
+        .in(Scopes.SINGLETON);
+    binder.bind(CarbondataHandleResolver.class).in(Scopes.SINGLETON);
+    binder.bind(CarbondataRecordSetProvider.class).in(Scopes.SINGLETON);
+    configBinder(binder).bindConfig(CarbonTableConfig.class);
+  }
+
+  public static final class TypeDeserializer extends FromStringDeserializer<Type> {
+    private final TypeManager typeManager;
+
+    @Inject public TypeDeserializer(TypeManager typeManager) {
+      super(Type.class);
+      this.typeManager = requireNonNull(typeManager, "typeManager is null");
+    }
+
+    @Override protected Type _deserialize(String value, DeserializationContext context) {
+      Type type = typeManager.getType(parseTypeSignature(value));
+      checkArgument(type != null, "Unknown type %s", value);
+      return type;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f6f1a5c/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
new file mode 100644
index 0000000..7c50c66
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.facebook.presto.spi.ConnectorPageSource;
+import com.facebook.presto.spi.Page;
+import com.facebook.presto.spi.PageBuilder;
+import com.facebook.presto.spi.RecordCursor;
+import com.facebook.presto.spi.RecordSet;
+import com.facebook.presto.spi.block.BlockBuilder;
+import com.facebook.presto.spi.type.DecimalType;
+import com.facebook.presto.spi.type.Type;
+import io.airlift.slice.Slice;
+
+import static com.facebook.presto.spi.type.Decimals.encodeUnscaledValue;
+import static com.facebook.presto.spi.type.Decimals.isShortDecimal;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static java.math.RoundingMode.HALF_UP;
+import static java.util.Collections.unmodifiableList;
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Carbondata Page Source class for custom Carbondata RecordSet Iteration.
+ */
+public class CarbondataPageSource implements ConnectorPageSource {
+
+  private static final int ROWS_PER_REQUEST = 4096;
+  private final RecordCursor cursor;
+  private final List<Type> types;
+  private final PageBuilder pageBuilder;
+  private boolean closed;
+  private final char[] buffer = new char[100];
+
+  public CarbondataPageSource(RecordSet recordSet)
+  {
+    this(requireNonNull(recordSet, "recordSet is null").getColumnTypes(), recordSet.cursor());
+  }
+
+  public CarbondataPageSource(List<Type> types, RecordCursor cursor)
+  {
+    this.cursor = requireNonNull(cursor, "cursor is null");
+    this.types = unmodifiableList(new ArrayList<>(requireNonNull(types, "types is null")));
+    this.pageBuilder = new PageBuilder(this.types);
+  }
+
+  public RecordCursor getCursor()
+  {
+    return cursor;
+  }
+
+  @Override public long getTotalBytes() {
+    return cursor.getTotalBytes();
+  }
+
+  @Override public long getCompletedBytes() {
+    return cursor.getCompletedBytes();
+  }
+
+  @Override public long getReadTimeNanos() {
+    return cursor.getReadTimeNanos();
+  }
+
+  @Override public boolean isFinished() {
+    return closed && pageBuilder.isEmpty();
+  }
+
+  @Override public Page getNextPage() {
+    if (!closed) {
+      int i;
+      for (i = 0; i < ROWS_PER_REQUEST; i++) {
+        if (pageBuilder.isFull()) {
+          break;
+        }
+        if (!cursor.advanceNextPosition()) {
+          closed = true;
+          break;
+        }
+
+        pageBuilder.declarePosition();
+        for (int column = 0; column < types.size(); column++) {
+          BlockBuilder output = pageBuilder.getBlockBuilder(column);
+          if (cursor.isNull(column)) {
+            output.appendNull();
+          } else {
+            Type type = types.get(column);
+            Class<?> javaType = type.getJavaType();
+            if (javaType == boolean.class) {
+              type.writeBoolean(output, cursor.getBoolean(column));
+            } else if (javaType == long.class) {
+              type.writeLong(output, cursor.getLong(column));
+            } else if (javaType == double.class) {
+              type.writeDouble(output, cursor.getDouble(column));
+            } else if (javaType == Slice.class) {
+              Slice slice = cursor.getSlice(column);
+              if(type instanceof  DecimalType)
+              {
+                if (isShortDecimal(type)) {
+                  type.writeLong(output, parseLong((DecimalType) type, slice, 0, slice.length()));
+                } else {
+                  type.writeSlice(output, parseSlice((DecimalType) type, slice, 0, slice.length()));
+                }
+              } else {
+                type.writeSlice(output, slice, 0, slice.length());
+              }
+            } else {
+              type.writeObject(output, cursor.getObject(column));
+            }
+          }
+        }
+      }
+    }
+
+    // only return a page if the buffer is full or we are finishing
+    if (pageBuilder.isEmpty() || (!closed && !pageBuilder.isFull())) {
+      return null;
+    }
+    Page page = pageBuilder.build();
+    pageBuilder.reset();
+    return page;
+ }
+
+  @Override public long getSystemMemoryUsage() {
+    return cursor.getSystemMemoryUsage() + pageBuilder.getSizeInBytes();
+  }
+
+  @Override public void close() throws IOException {
+    closed = true;
+    cursor.close();
+
+  }
+
+  private long parseLong(DecimalType type, Slice slice, int offset, int length)
+  {
+    BigDecimal decimal = parseBigDecimal(type, slice, offset, length);
+    return decimal.unscaledValue().longValue();
+  }
+
+
+  private Slice parseSlice(DecimalType type, Slice slice, int offset, int length)
+  {
+    BigDecimal decimal = parseBigDecimal(type, slice, offset, length);
+    return encodeUnscaledValue(decimal.unscaledValue());
+  }
+
+  private BigDecimal parseBigDecimal(DecimalType type, Slice slice, int offset, int length)
+  {
+    checkArgument(length < buffer.length);
+    for (int i = 0; i < length; i++) {
+      buffer[i] = (char) slice.getByte(offset + i);
+    }
+    BigDecimal decimal = new BigDecimal(buffer, 0, length);
+    checkState(decimal.scale() <= type.getScale(), "Read decimal value scale larger than column scale");
+    decimal = decimal.setScale(type.getScale(), HALF_UP);
+    checkState(decimal.precision() <= type.getPrecision(), "Read decimal precision larger than column precision");
+    return decimal;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f6f1a5c/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
new file mode 100644
index 0000000..46d8611
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import java.util.List;
+
+import com.facebook.presto.spi.ColumnHandle;
+import com.facebook.presto.spi.ConnectorPageSource;
+import com.facebook.presto.spi.ConnectorSession;
+import com.facebook.presto.spi.ConnectorSplit;
+import com.facebook.presto.spi.connector.ConnectorPageSourceProvider;
+import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+import com.google.inject.Inject;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Provider Class for Carbondata Page Source class.
+ */
+public class CarbondataPageSourceProvider implements ConnectorPageSourceProvider {
+
+  private CarbondataRecordSetProvider carbondataRecordSetProvider;
+
+  @Inject
+  public CarbondataPageSourceProvider(CarbondataRecordSetProvider carbondataRecordSetProvider)
+  {
+    this.carbondataRecordSetProvider = requireNonNull(carbondataRecordSetProvider, "recordSetProvider is null");
+  }
+
+  @Override
+  public ConnectorPageSource createPageSource(ConnectorTransactionHandle transactionHandle,
+      ConnectorSession session, ConnectorSplit split, List<ColumnHandle> columns) {
+    return new CarbondataPageSource(carbondataRecordSetProvider.getRecordSet(transactionHandle, session, split, columns));
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f6f1a5c/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java
index ad47f75..2e97dc0 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java
@@ -18,7 +18,14 @@
 package org.apache.carbondata.presto;
 
 import com.facebook.presto.spi.RecordCursor;
+import com.facebook.presto.spi.type.DecimalType;
+import com.facebook.presto.spi.type.Decimals;
+import com.facebook.presto.spi.type.TimestampType;
 import com.facebook.presto.spi.type.Type;
+import com.facebook.presto.spi.block.Block;
+import com.facebook.presto.spi.block.BlockBuilder;
+import com.facebook.presto.spi.block.BlockBuilderStatus;
+
 import com.google.common.base.Strings;
 import io.airlift.log.Logger;
 import io.airlift.slice.Slice;
@@ -26,16 +33,22 @@ import io.airlift.slice.Slices;
 import org.apache.carbondata.common.CarbonIterator;
 import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
 
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
+
 
 import static com.facebook.presto.spi.type.BooleanType.BOOLEAN;
+import static com.facebook.presto.spi.type.Decimals.isShortDecimal;
+import static com.facebook.presto.spi.type.Decimals.rescale;
 import static com.facebook.presto.spi.type.DoubleType.DOUBLE;
 import static com.facebook.presto.spi.type.VarcharType.VARCHAR;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
+import static io.airlift.slice.Slices.utf8Slice;
 
 public class CarbondataRecordCursor implements RecordCursor {
 
@@ -114,8 +127,10 @@ public class CarbondataRecordCursor implements RecordCursor {
 
   @Override public long getLong(int field) {
     String timeStr = getFieldValue(field);
-    Long milliSec = 0L;
-
+    Type actual = getType(field);
+    if(actual instanceof TimestampType){
+      return new Timestamp(Long.parseLong(timeStr)).getTime()/1000;
+    }
     //suppose the
     return Math.round(Double.parseDouble(getFieldValue(field)));
   }
@@ -126,8 +141,41 @@ public class CarbondataRecordCursor implements RecordCursor {
   }
 
   @Override public Slice getSlice(int field) {
-    checkFieldType(field, VARCHAR);
-    return Slices.utf8Slice(getFieldValue(field));
+    Type decimalType = getType(field);
+    if (decimalType instanceof DecimalType) {
+      DecimalType actual = (DecimalType) decimalType;
+      CarbondataColumnHandle carbondataColumnHandle = columnHandles.get(field);
+      if(carbondataColumnHandle.getPrecision() > 0 ) {
+        checkFieldType(field, DecimalType.createDecimalType(carbondataColumnHandle.getPrecision(), carbondataColumnHandle.getScale()));
+      } else {
+        checkFieldType(field, DecimalType.createDecimalType());
+      }
+      String fieldValue = getFieldValue(field);
+      BigDecimal bigDecimalValue = new BigDecimal(fieldValue);
+      if (isShortDecimal(decimalType)) {
+        return utf8Slice(Decimals.toString(bigDecimalValue.longValue(), actual.getScale()));
+      } else {
+        if (bigDecimalValue.scale() > actual.getScale()) {
+          BigInteger unscaledDecimal =
+              rescale(bigDecimalValue.unscaledValue(), bigDecimalValue.scale(),
+                  bigDecimalValue.scale());
+          Slice decimalSlice = Decimals.encodeUnscaledValue(unscaledDecimal);
+          return utf8Slice(Decimals.toString(decimalSlice, actual.getScale()));
+          //return decimalSlice;
+        } else {
+          BigInteger unscaledDecimal =
+              rescale(bigDecimalValue.unscaledValue(), bigDecimalValue.scale(), actual.getScale());
+          Slice decimalSlice = Decimals.encodeUnscaledValue(unscaledDecimal);
+          return utf8Slice(Decimals.toString(decimalSlice, actual.getScale()));
+          //return decimalSlice;
+
+        }
+
+      }
+    } else {
+      checkFieldType(field, VARCHAR);
+      return utf8Slice(getFieldValue(field));
+    }
   }
 
   @Override public Object getObject(int field) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f6f1a5c/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java
index d3fd7a0..7bf0e84 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java
@@ -99,12 +99,10 @@ public class CarbondataRecordSet implements RecordSet {
       RecordCursor rc = new CarbondataRecordCursor(readSupport, carbonIterator, columns, split);
       return rc;
     } catch (QueryExecutionException e) {
-      //throw new InterruptedException(e.getMessage());
-      System.out.println(e.getMessage());
-    } catch (Exception ex) {
-      System.out.println(ex.toString());
+       throw new RuntimeException(e.getMessage(), e);
+   } catch (Exception ex) {
+      throw new RuntimeException(ex.getMessage(), ex);
     }
-    return null;
   }
 }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f6f1a5c/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
index f0958c7..a9652cc 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
@@ -129,7 +129,7 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider {
       CarbondataColumnHandle cdch = (CarbondataColumnHandle) c;
       Type type = cdch.getColumnType();
 
-      DataType coltype = Spi2CarbondataTypeMapper(type);
+      DataType coltype = Spi2CarbondataTypeMapper(cdch);
       Expression colExpression = new ColumnExpression(cdch.getColumnName(), coltype);
 
       domain = originalConstraint.getDomains().get().get(c);
@@ -200,6 +200,10 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider {
         if (coltype.equals(DataType.STRING)) {
           ex = new EqualToExpression(colExpression,
               new LiteralExpression(((Slice) singleValues.get(0)).toStringUtf8(), coltype));
+        } else if (coltype.equals(DataType.TIMESTAMP) || coltype.equals(DataType.DATE)) {
+          Long value = (Long) singleValues.get(0) * 1000;
+          ex = new EqualToExpression(colExpression,
+              new LiteralExpression(value , coltype));
         } else ex = new EqualToExpression(colExpression,
             new LiteralExpression(singleValues.get(0), coltype));
         filters.add(ex);
@@ -241,16 +245,18 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider {
         CarbonInputFormatUtil.resolveFilter(finalFilters, queryModel.getAbsoluteTableIdentifier()));
   }
 
-  public static DataType Spi2CarbondataTypeMapper(Type colType) {
+  public static DataType Spi2CarbondataTypeMapper(CarbondataColumnHandle carbondataColumnHandle) {
+    Type colType = carbondataColumnHandle.getColumnType();
     if (colType == BooleanType.BOOLEAN) return DataType.BOOLEAN;
     else if (colType == SmallintType.SMALLINT) return DataType.SHORT;
     else if (colType == IntegerType.INTEGER) return DataType.INT;
     else if (colType == BigintType.BIGINT) return DataType.LONG;
     else if (colType == DoubleType.DOUBLE) return DataType.DOUBLE;
-    else if (colType == DecimalType.createDecimalType()) return DataType.DECIMAL;
     else if (colType == VarcharType.VARCHAR) return DataType.STRING;
     else if (colType == DateType.DATE) return DataType.DATE;
     else if (colType == TimestampType.TIMESTAMP) return DataType.TIMESTAMP;
+    else if (colType == DecimalType.createDecimalType(carbondataColumnHandle.getPrecision(),
+        carbondataColumnHandle.getScale())) return DataType.DECIMAL;
     else return DataType.STRING;
   }