You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/09/14 09:20:28 UTC
[35/54] [abbrv] carbondata git commit: [CARBONDATA-1469]
Optimizations for Presto Integration
[CARBONDATA-1469] Optimizations for Presto Integration
This closes #1348
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/1551a7c7
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/1551a7c7
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/1551a7c7
Branch: refs/heads/streaming_ingest
Commit: 1551a7c7d4046964a299d01a927b2900a84dc2f3
Parents: 0ab928e
Author: Bhavya <bh...@knoldus.com>
Authored: Mon Sep 11 16:33:07 2017 +0530
Committer: CHEN LIANG <ch...@huawei.com>
Committed: Tue Sep 12 07:08:37 2017 +0800
----------------------------------------------------------------------
integration/presto/pom.xml | 536 ++++++++++++-------
.../carbondata/presto/PrestoFilterUtil.java | 75 ++-
.../readers/DecimalSliceStreamReader.java | 58 +-
.../presto/readers/DoubleStreamReader.java | 27 +-
.../presto/readers/IntegerStreamReader.java | 28 +-
.../presto/readers/LongStreamReader.java | 27 +-
.../presto/readers/ShortStreamReader.java | 80 +++
.../presto/readers/StreamReaders.java | 6 +
.../presto/readers/TimestampStreamReader.java | 79 +++
9 files changed, 682 insertions(+), 234 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1551a7c7/integration/presto/pom.xml
----------------------------------------------------------------------
diff --git a/integration/presto/pom.xml b/integration/presto/pom.xml
index 562718f..617ce93 100644
--- a/integration/presto/pom.xml
+++ b/integration/presto/pom.xml
@@ -15,7 +15,9 @@
See the License for the specific language governing permissions and
limitations under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
@@ -37,49 +39,223 @@
<dependencies>
<dependency>
- <groupId>org.apache.thrift</groupId>
- <artifactId>libthrift</artifactId>
- <version>0.9.3</version>
- </dependency>
-
- <dependency>
<groupId>org.apache.carbondata</groupId>
- <artifactId>carbondata-core</artifactId>
+ <artifactId>carbondata-hadoop</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_2.10</artifactId>
+ <artifactId>spark-network-shuffle_2.11</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sketch_2.11</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>net.java.dev.jets3t</groupId>
+ <artifactId>jets3t</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.servlet</groupId>
+ <artifactId>javax.servlet-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-math3</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.antlr</groupId>
+ <artifactId>antlr4-runtime</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.esotericsoftware</groupId>
+ <artifactId>minlog</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.codehaus.janino</groupId>
+ <artifactId>janino</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>net.jpountz.lz4</groupId>
+ <artifactId>lz4</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>net.sf.py4j</groupId>
+ <artifactId>py4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.spark-project.spark</groupId>
+ <artifactId>unused</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-tags_2.11</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-column</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-hadoop</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.glassfish.jersey.core</groupId>
+ <artifactId>jersey-client</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.glassfish.jersey.core</groupId>
+ <artifactId>jersey-common</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.glassfish.jersey.core</groupId>
+ <artifactId>jersey-server</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.glassfish.jersey.containers</groupId>
+ <artifactId>jersey-container-servlet</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.glassfish.jersey.containers</groupId>
+ <artifactId>jersey-container-servlet-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.glassfish.jersey.containers</groupId>
+ <artifactId>jersey-container-servlet-core</artifactId>
</exclusion>
- </exclusions>
- </dependency>
-
-
- <dependency>
- <groupId>org.apache.carbondata</groupId>
- <artifactId>carbondata-common</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.carbondata</groupId>
- <artifactId>carbondata-processing</artifactId>
- <version>${project.version}</version>
- <exclusions>
+ <exclusion>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-recipes</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro-mapred</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.twitter</groupId>
+ <artifactId>chill_2.11</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.dropwizard.metrics</groupId>
+ <artifactId>metrics-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.dropwizard.metrics</groupId>
+ <artifactId>metrics-jvm</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.dropwizard.metrics</groupId>
+ <artifactId>metrics-json</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.dropwizard.metrics</groupId>
+ <artifactId>metrics-graphite</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>jsr305</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>net.java.dev</groupId>
+ <artifactId>jets3t</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.xbean</groupId>
+ <artifactId>xbean-asm5-shaded</artifactId>
+ </exclusion>
<exclusion>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_2.10</artifactId>
+ <artifactId>spark-launcher_2.11</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-network-common_2.11</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.ning</groupId>
+ <artifactId>compress-lzf</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.roaringbitmap</groupId>
+ <artifactId>RoaringBitmap</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.thoughtworks.paranamer</groupId>
+ <artifactId>paranamer</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scalap</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-compiler</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.scala-lang..modules</groupId>
+ <artifactId>parser-combinators_2.11</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.scala-lang..modules</groupId>
+ <artifactId>scala-xml_2.11</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest_2.11</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>net.sf.py4</groupId>
+ <artifactId>py4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>net.razorvine</groupId>
+ <artifactId>pyrolite</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.clearspring.analytics</groupId>
+ <artifactId>stream</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>jul-to-slf4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.ivy</groupId>
+ <artifactId>ivy</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>oro</groupId>
+ <artifactId>oro</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
- <groupId>org.apache.carbondata</groupId>
- <artifactId>carbondata-hadoop</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
<groupId>io.airlift</groupId>
<artifactId>bootstrap</artifactId>
<version>0.144</version>
@@ -87,6 +263,38 @@
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
+ <artifactId>slf4j-jdk14</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>jcl-over-slf4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.xml.bind</groupId>
+ <artifactId>jaxb-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>aopalliance</groupId>
+ <artifactId>aopalliance</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.weakref</groupId>
+ <artifactId>jmxutils</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>cglib</groupId>
+ <artifactId>cglib-nodep</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>annotations</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
</exclusion>
</exclusions>
@@ -98,21 +306,6 @@
<version>0.144</version>
<!--<scope>provided</scope>-->
</dependency>
-
- <dependency>
- <groupId>io.airlift</groupId>
- <artifactId>log</artifactId>
- <version>0.144</version>
- <!--<scope>provided</scope>-->
- </dependency>
-
- <dependency>
- <groupId>io.airlift</groupId>
- <artifactId>slice</artifactId>
- <version>0.29</version>
- <scope>provided</scope>
- </dependency>
-
<dependency>
<groupId>io.airlift</groupId>
<artifactId>units</artifactId>
@@ -126,19 +319,6 @@
<version>2.6.0</version>
<scope>provided</scope>
</dependency>
-
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- <version>18.0</version>
- </dependency>
-
- <dependency>
- <groupId>com.google.inject</groupId>
- <artifactId>guice</artifactId>
- <version>3.0</version>
- </dependency>
-
<!--presto integrated-->
<dependency>
<groupId>com.facebook.presto</groupId>
@@ -146,152 +326,140 @@
<version>${presto.version}</version>
<scope>provided</scope>
</dependency>
-
+ <dependency>
+ <groupId>commons-lang</groupId>
+ <artifactId>commons-lang</artifactId>
+ <version>2.5</version>
+ </dependency>
<dependency>
<groupId>com.facebook.presto.hadoop</groupId>
<artifactId>hadoop-apache2</artifactId>
<version>2.7.3-1</version>
</dependency>
-
<dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.11</artifactId>
- <version>2.1.0</version>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-compress</artifactId>
+ <version>1.4.1</version>
<exclusions>
<exclusion>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-databind</artifactId>
+ <groupId>org.tukaani</groupId>
+ <artifactId>xz</artifactId>
</exclusion>
</exclusions>
</dependency>
+
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-catalyst_2.10 -->
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-catalyst_2.11</artifactId>
- <version>2.1.0</version>
- </dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10 -->
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_2.11</artifactId>
- <version>2.1.0</version>
- <exclusions>
- <exclusion>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-databind</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
</dependencies>
- <build>
- <plugins>
- <plugin>
- <artifactId>maven-compiler-plugin</artifactId>
- <configuration>
- <source>1.8</source>
- <target>1.8</target>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <version>2.18</version>
- <!-- Note config is repeated in scalatest config -->
- <configuration>
- <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
- <argLine>-Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m</argLine>
- <systemProperties>
- <java.awt.headless>true</java.awt.headless>
- </systemProperties>
- <failIfNoTests>false</failIfNoTests>
- </configuration>
- </plugin>
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>1.8</source>
+ <target>1.8</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.18</version>
+ <!-- Note config is repeated in scalatest config -->
+ <configuration>
+ <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
+ <argLine>-Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m</argLine>
+ <systemProperties>
+ <java.awt.headless>true</java.awt.headless>
+ </systemProperties>
+ <failIfNoTests>false</failIfNoTests>
+ </configuration>
+ </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-checkstyle-plugin</artifactId>
- <version>2.17</version>
- <configuration>
- <skip>true</skip>
- </configuration>
- </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <version>2.17</version>
+ <configuration>
+ <skip>true</skip>
+ </configuration>
+ </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-enforcer-plugin</artifactId>
- <version>1.4.1</version>
- <configuration>
- <skip>true</skip>
- </configuration>
- </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-enforcer-plugin</artifactId>
+ <version>1.4.1</version>
+ <configuration>
+ <skip>true</skip>
+ </configuration>
+ </plugin>
- <plugin>
- <groupId>com.ning.maven.plugins</groupId>
- <artifactId>maven-dependency-versions-check-plugin</artifactId>
- <configuration>
- <skip>true</skip>
- <failBuildInCaseOfConflict>false</failBuildInCaseOfConflict>
- </configuration>
- </plugin>
+ <plugin>
+ <groupId>com.ning.maven.plugins</groupId>
+ <artifactId>maven-dependency-versions-check-plugin</artifactId>
+ <configuration>
+ <skip>true</skip>
+ <failBuildInCaseOfConflict>false</failBuildInCaseOfConflict>
+ </configuration>
+ </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-dependency-plugin</artifactId>
- <configuration>
- <skip>false</skip>
- </configuration>
- </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <configuration>
+ <skip>false</skip>
+ </configuration>
+ </plugin>
- <plugin>
- <groupId>com.ning.maven.plugins</groupId>
- <artifactId>maven-duplicate-finder-plugin</artifactId>
- <configuration>
- <skip>true</skip>
- </configuration>
- </plugin>
+ <plugin>
+ <groupId>com.ning.maven.plugins</groupId>
+ <artifactId>maven-duplicate-finder-plugin</artifactId>
+ <configuration>
+ <skip>true</skip>
+ </configuration>
+ </plugin>
- <plugin>
- <groupId>io.takari.maven.plugins</groupId>
- <artifactId>presto-maven-plugin</artifactId>
- <version>0.1.12</version>
- <extensions>true</extensions>
- </plugin>
+ <plugin>
+ <groupId>io.takari.maven.plugins</groupId>
+ <artifactId>presto-maven-plugin</artifactId>
+ <version>0.1.12</version>
+ <extensions>true</extensions>
+ </plugin>
- <plugin>
- <groupId>pl.project13.maven</groupId>
- <artifactId>git-commit-id-plugin</artifactId>
- <configuration>
- <skip>true</skip>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.scala-tools</groupId>
- <artifactId>maven-scala-plugin</artifactId>
- <version>2.15.2</version>
- <executions>
- <execution>
- <id>compile</id>
- <goals>
- <goal>compile</goal>
- </goals>
- <phase>compile</phase>
- </execution>
- <execution>
- <id>testCompile</id>
- <goals>
- <goal>testCompile</goal>
- </goals>
- <phase>test</phase>
- </execution>
- <execution>
- <phase>process-resources</phase>
- <goals>
- <goal>compile</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
+ <plugin>
+ <groupId>pl.project13.maven</groupId>
+ <artifactId>git-commit-id-plugin</artifactId>
+ <configuration>
+ <skip>true</skip>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.scala-tools</groupId>
+ <artifactId>maven-scala-plugin</artifactId>
+ <version>2.15.2</version>
+ <executions>
+ <execution>
+ <id>compile</id>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ <phase>compile</phase>
+ </execution>
+ <execution>
+ <id>testCompile</id>
+ <goals>
+ <goal>testCompile</goal>
+ </goals>
+ <phase>test</phase>
+ </execution>
+ <execution>
+ <phase>process-resources</phase>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1551a7c7/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoFilterUtil.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoFilterUtil.java b/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoFilterUtil.java
index 9a5a5cb..a958e63 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoFilterUtil.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoFilterUtil.java
@@ -17,6 +17,8 @@
package org.apache.carbondata.presto;
+import java.math.BigDecimal;
+import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
@@ -75,8 +77,8 @@ public class PrestoFilterUtil {
else if (colType == VarcharType.VARCHAR) return DataType.STRING;
else if (colType == DateType.DATE) return DataType.DATE;
else if (colType == TimestampType.TIMESTAMP) return DataType.TIMESTAMP;
- else if (colType == DecimalType.createDecimalType(carbondataColumnHandle.getPrecision(),
- carbondataColumnHandle.getScale())) return DataType.DECIMAL;
+ else if (colType.equals(DecimalType.createDecimalType(carbondataColumnHandle.getPrecision(),
+ carbondataColumnHandle.getScale()))) return DataType.DECIMAL;
else return DataType.STRING;
}
@@ -104,13 +106,12 @@ public class PrestoFilterUtil {
checkArgument(domain.getType().isOrderable(), "Domain type must be orderable");
List<Object> singleValues = new ArrayList<>();
- List<Expression> disjuncts = new ArrayList<>();
+ Map<Object, List<Expression>> valueExpressionMap = new HashMap<>();
for (Range range : domain.getValues().getRanges().getOrderedRanges()) {
if (range.isSingleValue()) {
Object value = ConvertDataByType(range.getLow().getValue(), type);
singleValues.add(value);
} else {
- List<Expression> rangeConjuncts = new ArrayList<>();
if (!range.getLow().isLowerUnbounded()) {
Object value = ConvertDataByType(range.getLow().getValue(), type);
switch (range.getLow().getBound()) {
@@ -120,14 +121,20 @@ public class PrestoFilterUtil {
} else {
GreaterThanExpression greater = new GreaterThanExpression(colExpression,
new LiteralExpression(value, coltype));
- rangeConjuncts.add(greater);
+ if(valueExpressionMap.get(value) == null) {
+ valueExpressionMap.put(value, new ArrayList<>());
+ }
+ valueExpressionMap.get(value).add(greater);
}
break;
case EXACTLY:
GreaterThanEqualToExpression greater =
new GreaterThanEqualToExpression(colExpression,
new LiteralExpression(value, coltype));
- rangeConjuncts.add(greater);
+ if(valueExpressionMap.get(value) == null) {
+ valueExpressionMap.put(value, new ArrayList<>());
+ }
+ valueExpressionMap.get(value).add(greater);
break;
case BELOW:
throw new IllegalArgumentException("Low marker should never use BELOW bound");
@@ -143,18 +150,23 @@ public class PrestoFilterUtil {
case EXACTLY:
LessThanEqualToExpression less = new LessThanEqualToExpression(colExpression,
new LiteralExpression(value, coltype));
- rangeConjuncts.add(less);
+ if(valueExpressionMap.get(value) == null) {
+ valueExpressionMap.put(value, new ArrayList<>());
+ }
+ valueExpressionMap.get(value).add(less);
break;
case BELOW:
LessThanExpression less2 =
new LessThanExpression(colExpression, new LiteralExpression(value, coltype));
- rangeConjuncts.add(less2);
+ if(valueExpressionMap.get(value) == null) {
+ valueExpressionMap.put(value, new ArrayList<>());
+ }
+ valueExpressionMap.get(value).add(less2);
break;
default:
throw new AssertionError("Unhandled bound: " + range.getHigh().getBound());
}
}
- disjuncts.addAll(rangeConjuncts);
}
}
if (singleValues.size() == 1) {
@@ -174,19 +186,34 @@ public class PrestoFilterUtil {
.map((a) -> new LiteralExpression(ConvertDataByType(a, type), coltype))
.collect(Collectors.toList());
candidates = new ListExpression(exs);
-
filters.add(new InExpression(colExpression, candidates));
- } else if (disjuncts.size() > 0) {
- if (disjuncts.size() > 1) {
- Expression finalFilters = new OrExpression(disjuncts.get(0), disjuncts.get(1));
- if (disjuncts.size() > 2) {
- for (int i = 2; i < disjuncts.size(); i++) {
- filters.add(new AndExpression(finalFilters, disjuncts.get(i)));
+ } else if (valueExpressionMap.size() > 0) {
+ List<Expression> valuefilters = new ArrayList<>();
+ Expression finalFilters = null;
+ List<Expression> expressions;
+ for (Map.Entry<Object, List<Expression>> entry : valueExpressionMap.entrySet()) {
+ expressions = valueExpressionMap.get(entry.getKey());
+ if (expressions.size() == 1) {
+ finalFilters = expressions.get(0);
+ } else if (expressions.size() >= 2) {
+ finalFilters = new OrExpression(expressions.get(0), expressions.get(1));
+ for (int i = 2; i < expressions.size(); i++) {
+ finalFilters = new OrExpression(finalFilters, expressions.get(i));
}
- } else {
- filters.add(finalFilters);
}
- } else if (disjuncts.size() == 1) filters.add(disjuncts.get(0));
+ valuefilters.add(finalFilters);
+ }
+
+ if(valuefilters.size() == 1){
+ finalFilters = valuefilters.get(0);
+ } else if (valuefilters.size() >= 2) {
+ finalFilters = new AndExpression(valuefilters.get(0), valuefilters.get(1));
+ for (int i = 2; i < valuefilters.size() ; i++) {
+ finalFilters = new AndExpression(finalFilters, valuefilters.get(i));
+ }
+ }
+
+ filters.add(finalFilters);
}
}
@@ -196,7 +223,7 @@ public class PrestoFilterUtil {
finalFilters = new AndExpression(tmp.get(0), tmp.get(1));
if (tmp.size() > 2) {
for (int i = 2; i < tmp.size(); i++) {
- finalFilters = new OrExpression(finalFilters, tmp.get(i));
+ finalFilters = new AndExpression(finalFilters, tmp.get(i));
}
}
} else if (tmp.size() == 1) finalFilters = tmp.get(0);
@@ -223,6 +250,14 @@ public class PrestoFilterUtil {
Date date = c.getTime();
return date.getTime() * 1000;
}
+ else if (type instanceof DecimalType) {
+ if(rawdata instanceof Double) {
+ return new BigDecimal((Double) rawdata);
+ } else if (rawdata instanceof Long) {
+ return new BigDecimal(new BigInteger(String.valueOf(rawdata)),
+ ((DecimalType) type).getScale());
+ }
+ }
return rawdata;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1551a7c7/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java
index 89d4e60..6612ab0 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java
@@ -66,20 +66,17 @@ public class DecimalSliceStreamReader extends AbstractStreamReader {
int scale = ((DecimalType)type).getScale();
int precision = ((DecimalType)type).getPrecision();
if (columnVector != null) {
- for(int i = 0; i < numberOfRows ; i++ ){
- if(columnVector.isNullAt(i)) {
- builder.appendNull();
+ if(columnVector.anyNullsSet())
+ {
+ handleNullInVector(type, numberOfRows, builder, scale, precision);
+ } else {
+ if(isShortDecimal(type)) {
+ populateShortDecimalVector(type, numberOfRows, builder, scale, precision);
} else {
- Slice slice =
- getSlice(columnVector.getDecimal(i, precision, scale).toJavaBigDecimal(), type);
- if (isShortDecimal(type)) {
- type.writeLong(builder, parseLong((DecimalType) type, slice, 0, slice.length()));
- } else {
- type.writeSlice(builder, parseSlice((DecimalType) type, slice, 0, slice.length()));
- }
+ populateLongDecimalVector(type, numberOfRows, builder, scale, precision);
}
}
- }
+ }
} else {
if (streamData != null) {
@@ -182,4 +179,43 @@ public class DecimalSliceStreamReader extends AbstractStreamReader {
return decimal;
}
+
+ private void handleNullInVector(Type type, int numberOfRows, BlockBuilder builder, int scale,
+ int precision) {
+ for (int i = 0; i < numberOfRows; i++) {
+ if (columnVector.isNullAt(i)) {
+ builder.appendNull();
+ } else {
+ if (isShortDecimal(type)) {
+ long rescaledDecimal = Decimals
+ .rescale(columnVector.getDecimal(i, precision, scale).toLong(),
+ columnVector.getDecimal(i, precision, scale).scale(), scale);
+ type.writeLong(builder, rescaledDecimal);
+ } else {
+ Slice slice =
+ getSlice(columnVector.getDecimal(i, precision, scale).toJavaBigDecimal(), type);
+ type.writeSlice(builder, parseSlice((DecimalType) type, slice, 0, slice.length()));
+ }
+ }
+ }
+ }
+
+ private void populateShortDecimalVector(Type type, int numberOfRows, BlockBuilder builder,
+ int scale, int precision) {
+ for (int i = 0; i < numberOfRows; i++) {
+ BigDecimal decimalValue = columnVector.getDecimal(i, precision, scale).toJavaBigDecimal();
+ long rescaledDecimal = Decimals.rescale(decimalValue.unscaledValue().longValue(),
+ decimalValue.scale(), scale);
+ type.writeLong(builder, rescaledDecimal);
+ }
+ }
+
+ private void populateLongDecimalVector(Type type, int numberOfRows, BlockBuilder builder,
+ int scale, int precision) {
+ for (int i = 0; i < numberOfRows; i++) {
+ Slice slice = getSlice(columnVector.getDecimal(i, precision, scale).toJavaBigDecimal(), type);
+ type.writeSlice(builder, parseSlice((DecimalType) type, slice, 0, slice.length()));
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1551a7c7/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DoubleStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DoubleStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DoubleStreamReader.java
index cacf5ce..2b90a8d 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DoubleStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DoubleStreamReader.java
@@ -47,12 +47,11 @@ public class DoubleStreamReader extends AbstractStreamReader {
numberOfRows = batchSize;
builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
if (columnVector != null) {
- for (int i = 0; i < numberOfRows; i++) {
- if (columnVector.isNullAt(i)) {
- builder.appendNull();
- } else {
- type.writeDouble(builder, columnVector.getDouble(i));
- }
+ if(columnVector.anyNullsSet()) {
+ handleNullInVector(type, numberOfRows, builder);
+ }
+ else {
+ populateVector(type, numberOfRows, builder);
}
}
} else {
@@ -68,4 +67,20 @@ public class DoubleStreamReader extends AbstractStreamReader {
return builder.build();
}
+ private void handleNullInVector(Type type, int numberOfRows, BlockBuilder builder) {
+ for (int i = 0; i < numberOfRows; i++) {
+ if (columnVector.isNullAt(i)) {
+ builder.appendNull();
+ } else {
+ type.writeDouble(builder, columnVector.getDouble(i));
+ }
+ }
+ }
+
+ private void populateVector(Type type, int numberOfRows, BlockBuilder builder) {
+ for (int i = 0; i < numberOfRows; i++) {
+ type.writeDouble(builder, columnVector.getDouble(i));
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1551a7c7/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java
index 13280c8..ccc0192 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java
@@ -41,13 +41,11 @@ public class IntegerStreamReader extends AbstractStreamReader {
numberOfRows = batchSize;
builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
if (columnVector != null) {
- for(int i = 0; i < numberOfRows ; i++ ){
- if(columnVector.isNullAt(i)){
- builder.appendNull();
- } else {
- type.writeLong(builder, ((Integer)columnVector.getInt(i)).longValue());
- }
-
+ if(columnVector.anyNullsSet()) {
+ handleNullInVector(type, numberOfRows, builder);
+ }
+ else {
+ populateVector(type, numberOfRows, builder);
}
}
@@ -64,4 +62,20 @@ public class IntegerStreamReader extends AbstractStreamReader {
return builder.build();
}
+ private void handleNullInVector(Type type, int numberOfRows, BlockBuilder builder) {
+ for (int i = 0; i < numberOfRows; i++) {
+ if (columnVector.isNullAt(i)) {
+ builder.appendNull();
+ } else {
+ type.writeLong(builder, ((Integer) columnVector.getInt(i)).longValue());
+ }
+ }
+ }
+
+ private void populateVector(Type type, int numberOfRows, BlockBuilder builder) {
+ for (int i = 0; i < numberOfRows; i++) {
+ type.writeLong(builder, columnVector.getInt(i));
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1551a7c7/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java
index 9d602a6..5081b32 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java
@@ -37,12 +37,11 @@ public class LongStreamReader extends AbstractStreamReader {
numberOfRows = batchSize;
builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
if (columnVector != null) {
- for (int i = 0; i < numberOfRows; i++) {
- if (columnVector.isNullAt(i)) {
- builder.appendNull();
- } else {
- type.writeLong(builder, columnVector.getLong(i));
- }
+ if(columnVector.anyNullsSet()) {
+ handleNullInVector(type, numberOfRows, builder);
+ }
+ else {
+ populateVector(type, numberOfRows, builder);
}
}
@@ -59,4 +58,20 @@ public class LongStreamReader extends AbstractStreamReader {
return builder.build();
}
+ private void handleNullInVector(Type type, int numberOfRows, BlockBuilder builder) {
+ for (int i = 0; i < numberOfRows; i++) {
+ if (columnVector.isNullAt(i)) {
+ builder.appendNull();
+ } else {
+ type.writeLong(builder, columnVector.getLong(i));
+ }
+ }
+ }
+
+ private void populateVector(Type type, int numberOfRows, BlockBuilder builder) {
+ for (int i = 0; i < numberOfRows; i++) {
+ type.writeLong(builder, columnVector.getLong(i));
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1551a7c7/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ShortStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ShortStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ShortStreamReader.java
new file mode 100644
index 0000000..59d8e96
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ShortStreamReader.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto.readers;
+
+import java.io.IOException;
+
+import com.facebook.presto.spi.block.Block;
+import com.facebook.presto.spi.block.BlockBuilder;
+import com.facebook.presto.spi.block.BlockBuilderStatus;
+import com.facebook.presto.spi.type.Type;
+
+public class ShortStreamReader extends AbstractStreamReader {
+
+
+ public ShortStreamReader( ) {
+
+ }
+
+ public Block readBlock(Type type)
+ throws IOException
+ {
+ int numberOfRows = 0;
+ BlockBuilder builder = null;
+ if(isVectorReader) {
+ numberOfRows = batchSize;
+ builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
+ if (columnVector != null) {
+ if(columnVector.anyNullsSet()) {
+ handleNullInVector(type, numberOfRows, builder);
+ }
+ else {
+ populateVector(type, numberOfRows, builder);
+ }
+ }
+
+ } else {
+ numberOfRows = streamData.length;
+ builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
+ if (streamData != null) {
+ for(int i = 0; i < numberOfRows ; i++ ){
+ type.writeLong(builder,(Short)streamData[i]);
+ }
+ }
+ }
+
+ return builder.build();
+ }
+
+ private void handleNullInVector(Type type, int numberOfRows, BlockBuilder builder) {
+ for (int i = 0; i < numberOfRows; i++) {
+ if (columnVector.isNullAt(i)) {
+ builder.appendNull();
+ } else {
+ type.writeLong(builder, (columnVector.getShort(i)));
+ }
+ }
+ }
+
+ private void populateVector(Type type, int numberOfRows, BlockBuilder builder) {
+ for (int i = 0; i < numberOfRows; i++) {
+ type.writeLong(builder, (columnVector.getShort(i)));
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1551a7c7/integration/presto/src/main/java/org/apache/carbondata/presto/readers/StreamReaders.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/StreamReaders.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/StreamReaders.java
index abd8787..86f863a 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/StreamReaders.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/StreamReaders.java
@@ -23,6 +23,8 @@ import com.facebook.presto.spi.block.SliceArrayBlock;
import com.facebook.presto.spi.type.DateType;
import com.facebook.presto.spi.type.DecimalType;
import com.facebook.presto.spi.type.IntegerType;
+import com.facebook.presto.spi.type.SmallintType;
+import com.facebook.presto.spi.type.TimestampType;
import com.facebook.presto.spi.type.Type;
import io.airlift.slice.Slice;
@@ -44,6 +46,10 @@ public final class StreamReaders {
return new IntegerStreamReader();
} else if (type instanceof DecimalType) {
return new DecimalSliceStreamReader();
+ } else if (type instanceof SmallintType) {
+ return new ShortStreamReader();
+ } else if (type instanceof TimestampType) {
+ return new TimestampStreamReader();
}
return new LongStreamReader();
} else if (javaType == double.class) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1551a7c7/integration/presto/src/main/java/org/apache/carbondata/presto/readers/TimestampStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/TimestampStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/TimestampStreamReader.java
new file mode 100644
index 0000000..8ea3efb
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/TimestampStreamReader.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto.readers;
+
+import java.io.IOException;
+
+import com.facebook.presto.spi.block.Block;
+import com.facebook.presto.spi.block.BlockBuilder;
+import com.facebook.presto.spi.block.BlockBuilderStatus;
+import com.facebook.presto.spi.type.Type;
+
+public class TimestampStreamReader extends AbstractStreamReader {
+
+ private int TIMESTAMP_DIVISOR = 1000;
+
+ public TimestampStreamReader() {
+
+ }
+
+ public Block readBlock(Type type) throws IOException {
+ int numberOfRows = 0;
+ BlockBuilder builder = null;
+ if (isVectorReader) {
+ numberOfRows = batchSize;
+ builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
+ if (columnVector != null) {
+ if(columnVector.anyNullsSet()) {
+ handleNullInVector(type, numberOfRows, builder);
+ }
+ else {
+ populateVector(type, numberOfRows, builder);
+ }
+ }
+
+ } else {
+ numberOfRows = streamData.length;
+ builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
+ if (streamData != null) {
+ for (int i = 0; i < numberOfRows; i++) {
+ type.writeLong(builder, (Long) streamData[i]);
+ }
+ }
+ }
+
+ return builder.build();
+ }
+
+ private void handleNullInVector(Type type, int numberOfRows, BlockBuilder builder) {
+ for (int i = 0; i < numberOfRows; i++) {
+ if (columnVector.isNullAt(i)) {
+ builder.appendNull();
+ } else {
+ type.writeLong(builder, columnVector.getLong(i)/ TIMESTAMP_DIVISOR);
+ }
+ }
+ }
+
+ private void populateVector(Type type, int numberOfRows, BlockBuilder builder) {
+ for (int i = 0; i < numberOfRows; i++) {
+ type.writeLong(builder, columnVector.getLong(i)/TIMESTAMP_DIVISOR);
+ }
+ }
+
+}