You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by jb...@apache.org on 2016/06/23 14:15:52 UTC
[04/56] [abbrv] incubator-carbondata git commit: [Issue 618]Supported
Spark 1.6 in Carbondata (#670)
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/sum/SumBigDecimalAggregator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/sum/SumBigDecimalAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/impl/sum/SumBigDecimalAggregator.java
new file mode 100644
index 0000000..b901878
--- /dev/null
+++ b/core/src/main/java/org/carbondata/query/aggregator/impl/sum/SumBigDecimalAggregator.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.carbondata.query.aggregator.impl.sum;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+
+import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk;
+import org.carbondata.core.util.DataTypeUtil;
+import org.carbondata.query.aggregator.MeasureAggregator;
+import org.carbondata.query.aggregator.impl.AbstractMeasureAggregatorBasic;
+
+public class SumBigDecimalAggregator extends AbstractMeasureAggregatorBasic {
+
+ /**
+ * serialVersionUID
+ */
+ private static final long serialVersionUID = 623750056131364540L;
+
+ /**
+ * aggregate value
+ */
+ private BigDecimal aggVal;
+
+ public SumBigDecimalAggregator() {
+ aggVal = new BigDecimal(0);
+ firstTime = false;
+ }
+
+ /**
+ * This method will update the aggVal it will add new value to aggVal
+ *
+ * @param newVal new value
+ */
+ @Override public void agg(Object newVal) {
+ if (firstTime) {
+ aggVal = (BigDecimal) newVal;
+ firstTime = false;
+ } else {
+ aggVal = aggVal.add((BigDecimal) newVal);
+ }
+ }
+
+ @Override public void agg(MeasureColumnDataChunk dataChunk, int index) {
+ if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) {
+ BigDecimal value = dataChunk.getMeasureDataHolder().getReadableBigDecimalValueByIndex(index);
+ aggVal = aggVal.add(value);
+ firstTime = false;
+ }
+ }
+
+ /**
+ * Below method will be used to get the value byte array
+ */
+ @Override public byte[] getByteArray() {
+ if (firstTime) {
+ return new byte[0];
+ }
+ byte[] bytes = DataTypeUtil.bigDecimalToByte(aggVal);
+ ByteBuffer allocate = ByteBuffer.allocate(4 + bytes.length);
+
+ allocate.putInt(bytes.length);
+ allocate.put(bytes);
+ allocate.rewind();
+ return allocate.array();
+ }
+
+ /**
+ * This method will return aggVal
+ *
+ * @return sum value
+ */
+ @Override public BigDecimal getBigDecimalValue() {
+ return aggVal;
+ }
+
+ /* Merge the value, it will update the sum aggregate value it will add new
+ * value to aggVal
+ *
+ * @param aggregator
+ * SumAggregator
+ *
+ */
+ @Override public void merge(MeasureAggregator aggregator) {
+ if (!aggregator.isFirstTime()) {
+ agg(aggregator.getBigDecimalValue());
+ }
+ }
+
+ /**
+ * This method return the sum value as an object
+ *
+ * @return sum value as an object
+ */
+ @Override public Object getValueObject() {
+ return aggVal;
+ }
+
+ @Override public void setNewValue(Object newValue) {
+ aggVal = (BigDecimal) newValue;
+ }
+
+ @Override public void readData(DataInput inPut) throws IOException {
+ firstTime = inPut.readBoolean();
+ aggVal = new BigDecimal(inPut.readUTF());
+ }
+
+ @Override public void writeData(DataOutput output) throws IOException {
+ output.writeBoolean(firstTime);
+ output.writeUTF(aggVal.toString());
+
+ }
+
+ @Override public MeasureAggregator getCopy() {
+ SumBigDecimalAggregator aggr = new SumBigDecimalAggregator();
+ aggr.aggVal = aggVal;
+ aggr.firstTime = firstTime;
+ return aggr;
+ }
+
+ @Override public void merge(byte[] value) {
+ if (0 == value.length) {
+ return;
+ }
+
+ ByteBuffer buffer = ByteBuffer.wrap(value);
+ byte[] valueByte = new byte[buffer.getInt()];
+ buffer.get(valueByte);
+ BigDecimal valueBigDecimal = DataTypeUtil.byteToBigDecimal(valueByte);
+ aggVal = aggVal.add(valueBigDecimal);
+ firstTime = false;
+ }
+
+ public String toString() {
+ return aggVal + "";
+ }
+
+ @Override public int compareTo(MeasureAggregator o) {
+ BigDecimal value = getBigDecimalValue();
+ BigDecimal otherVal = o.getBigDecimalValue();
+ return value.compareTo(otherVal);
+ }
+
+ @Override public boolean equals(Object obj) {
+ if (!(obj instanceof SumBigDecimalAggregator)) {
+ return false;
+ }
+ SumBigDecimalAggregator o = (SumBigDecimalAggregator) obj;
+ return getBigDecimalValue().equals(o.getBigDecimalValue());
+ }
+
+ @Override public int hashCode() {
+ return getBigDecimalValue().hashCode();
+ }
+
+ @Override public MeasureAggregator getNew() {
+ return new SumBigDecimalAggregator();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/sum/SumDoubleAggregator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/sum/SumDoubleAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/impl/sum/SumDoubleAggregator.java
new file mode 100644
index 0000000..777318d
--- /dev/null
+++ b/core/src/main/java/org/carbondata/query/aggregator/impl/sum/SumDoubleAggregator.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.carbondata.query.aggregator.impl.sum;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk;
+import org.carbondata.core.constants.CarbonCommonConstants;
+import org.carbondata.query.aggregator.MeasureAggregator;
+import org.carbondata.query.aggregator.impl.AbstractMeasureAggregatorBasic;
+
+public class SumDoubleAggregator extends AbstractMeasureAggregatorBasic {
+
+ /**
+ * serialVersionUID
+ */
+ private static final long serialVersionUID = 623750056131364540L;
+
+ /**
+ * aggregate value
+ */
+ private double aggVal;
+
+ /**
+ * This method will update the aggVal it will add new value to aggVal
+ *
+ * @param newVal new value
+ */
+ @Override public void agg(double newVal) {
+ aggVal += newVal;
+ firstTime = false;
+ }
+
+ /**
+ * This method will update the aggVal it will add new value to aggVal
+ *
+ * @param newVal new value
+ */
+ @Override public void agg(Object newVal) {
+ aggVal += ((Number) newVal).doubleValue();
+ firstTime = false;
+ }
+
+ @Override public void agg(MeasureColumnDataChunk dataChunk, int index) {
+ if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) {
+ aggVal+= dataChunk.getMeasureDataHolder().getReadableDoubleValueByIndex(index);
+ firstTime = false;
+ }
+ }
+
+ /**
+ * Below method will be used to get the value byte array
+ */
+ @Override public byte[] getByteArray() {
+ if (firstTime) {
+ return new byte[0];
+ }
+ ByteBuffer buffer = ByteBuffer.allocate(CarbonCommonConstants.DOUBLE_SIZE_IN_BYTE);
+ buffer.putDouble(aggVal);
+ return buffer.array();
+ }
+
+ /**
+ * This method will return aggVal
+ *
+ * @return sum value
+ */
+
+ @Override public Double getDoubleValue() {
+ return aggVal;
+ }
+
+ /* Merge the value, it will update the sum aggregate value it will add new
+ * value to aggVal
+ *
+ * @param aggregator SumAggregator
+ *
+ */
+ @Override public void merge(MeasureAggregator aggregator) {
+ if (!aggregator.isFirstTime()) {
+ agg(aggregator.getDoubleValue());
+ }
+ }
+
+ /**
+ * This method return the sum value as an object
+ *
+ * @return sum value as an object
+ */
+ @Override public Object getValueObject() {
+ return aggVal;
+ }
+
+ @Override public void setNewValue(Object newValue) {
+ aggVal = (Double) newValue;
+ }
+
+ @Override public boolean isFirstTime() {
+ return firstTime;
+ }
+
+ @Override public void readData(DataInput inPut) throws IOException {
+ firstTime = inPut.readBoolean();
+ aggVal = inPut.readDouble();
+ }
+
+ @Override public void writeData(DataOutput output) throws IOException {
+ output.writeBoolean(firstTime);
+ output.writeDouble(aggVal);
+
+ }
+
+ @Override public MeasureAggregator getCopy() {
+ SumDoubleAggregator aggr = new SumDoubleAggregator();
+ aggr.aggVal = aggVal;
+ aggr.firstTime = firstTime;
+ return aggr;
+ }
+
+ @Override public void merge(byte[] value) {
+ if (0 == value.length) {
+ return;
+ }
+ aggVal += ByteBuffer.wrap(value).getDouble();
+ firstTime = false;
+ }
+
+ public String toString() {
+ return aggVal + "";
+ }
+
+ @Override public int compareTo(MeasureAggregator o) {
+ double value = getDoubleValue();
+ double otherVal = o.getDoubleValue();
+ if (value > otherVal) {
+ return 1;
+ }
+ if (value < otherVal) {
+ return -1;
+ }
+ return 0;
+ }
+
+ @Override public boolean equals(Object obj) {
+ if(!(obj instanceof SumDoubleAggregator)) {
+ return false;
+ }
+ SumDoubleAggregator o = (SumDoubleAggregator) obj;
+ return getDoubleValue().equals(o.getDoubleValue());
+ }
+
+ @Override public int hashCode() {
+ return getDoubleValue().hashCode();
+ }
+
+ @Override public MeasureAggregator getNew() {
+ return new SumDoubleAggregator();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/sum/SumLongAggregator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/sum/SumLongAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/impl/sum/SumLongAggregator.java
new file mode 100644
index 0000000..7c245d9
--- /dev/null
+++ b/core/src/main/java/org/carbondata/query/aggregator/impl/sum/SumLongAggregator.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.carbondata.query.aggregator.impl.sum;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk;
+import org.carbondata.core.constants.CarbonCommonConstants;
+import org.carbondata.query.aggregator.MeasureAggregator;
+import org.carbondata.query.aggregator.impl.AbstractMeasureAggregatorBasic;
+
+public class SumLongAggregator extends AbstractMeasureAggregatorBasic {
+
+ /**
+ * serialVersionUID
+ */
+ private static final long serialVersionUID = 623750056131364540L;
+
+ /**
+ * aggregate value
+ */
+ private long aggVal;
+
+ /**
+ * This method will update the aggVal it will add new value to aggVal
+ *
+ * @param newVal new value
+ */
+ @Override public void agg(Object newVal) {
+ aggVal += (long) newVal;
+ firstTime = false;
+ }
+
+ @Override public void agg(MeasureColumnDataChunk dataChunk, int index) {
+ if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) {
+ aggVal+= dataChunk.getMeasureDataHolder().getReadableLongValueByIndex(index);
+ firstTime = false;
+ }
+ }
+
+ /**
+ * Below method will be used to get the value byte array
+ */
+ @Override public byte[] getByteArray() {
+ if (firstTime) {
+ return new byte[0];
+ }
+ ByteBuffer buffer = ByteBuffer.allocate(CarbonCommonConstants.LONG_SIZE_IN_BYTE);
+ buffer.putLong(aggVal);
+ return buffer.array();
+ }
+
+ /**
+ * This method will return aggVal
+ *
+ * @return sum value
+ */
+ @Override public Long getLongValue() {
+ return aggVal;
+ }
+
+ /* Merge the value, it will update the sum aggregate value it will add new
+ * value to aggVal
+ *
+ * @param aggregator SumAggregator
+ *
+ */
+ @Override public void merge(MeasureAggregator aggregator) {
+ if (!aggregator.isFirstTime()) {
+ agg(aggregator.getLongValue());
+ }
+ }
+
+ /**
+ * This method return the sum value as an object
+ *
+ * @return sum long value as an object
+ */
+ @Override public Object getValueObject() {
+ return aggVal;
+ }
+
+ @Override public void setNewValue(Object newValue) {
+ aggVal = (long) newValue;
+ }
+
+ @Override public void readData(DataInput inPut) throws IOException {
+ firstTime = inPut.readBoolean();
+ aggVal = inPut.readLong();
+ }
+
+ @Override public void writeData(DataOutput output) throws IOException {
+ output.writeBoolean(firstTime);
+ output.writeLong(aggVal);
+
+ }
+
+ @Override public MeasureAggregator getCopy() {
+ SumLongAggregator aggr = new SumLongAggregator();
+ aggr.aggVal = aggVal;
+ aggr.firstTime = firstTime;
+ return aggr;
+ }
+
+ @Override public void merge(byte[] value) {
+ if (0 == value.length) {
+ return;
+ }
+ aggVal += ByteBuffer.wrap(value).getLong();
+ firstTime = false;
+ }
+
+ public String toString() {
+ return aggVal + "";
+ }
+
+ @Override public int compareTo(MeasureAggregator o) {
+ Long value = getLongValue();
+ Long otherVal = o.getLongValue();
+ if (value > otherVal) {
+ return 1;
+ }
+ if (value < otherVal) {
+ return -1;
+ }
+ return 0;
+ }
+
+ @Override public boolean equals(Object obj) {
+ if(!(obj instanceof SumLongAggregator)) {
+ return false;
+ }
+ SumLongAggregator o = (SumLongAggregator) obj;
+ return getLongValue().equals(o.getLongValue());
+ }
+
+ @Override public int hashCode() {
+ return getLongValue().hashCode();
+ }
+
+ @Override public MeasureAggregator getNew() {
+ // TODO Auto-generated method stub
+ return new SumLongAggregator();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/util/MeasureAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/aggregator/util/MeasureAggregatorFactory.java b/core/src/main/java/org/carbondata/query/aggregator/util/MeasureAggregatorFactory.java
index 483392e..b46c4de 100644
--- a/core/src/main/java/org/carbondata/query/aggregator/util/MeasureAggregatorFactory.java
+++ b/core/src/main/java/org/carbondata/query/aggregator/util/MeasureAggregatorFactory.java
@@ -24,28 +24,28 @@ import org.carbondata.core.carbon.metadata.datatype.DataType;
import org.carbondata.core.constants.CarbonCommonConstants;
import org.carbondata.query.aggregator.CustomMeasureAggregator;
import org.carbondata.query.aggregator.MeasureAggregator;
-import org.carbondata.query.aggregator.impl.AvgBigDecimalAggregator;
-import org.carbondata.query.aggregator.impl.AvgDoubleAggregator;
-import org.carbondata.query.aggregator.impl.AvgLongAggregator;
-import org.carbondata.query.aggregator.impl.CountAggregator;
-import org.carbondata.query.aggregator.impl.DistinctCountAggregatorObjectSet;
-import org.carbondata.query.aggregator.impl.DistinctCountBigDecimalAggregatorObjectSet;
-import org.carbondata.query.aggregator.impl.DistinctCountLongAggregatorObjectSet;
-import org.carbondata.query.aggregator.impl.DummyBigDecimalAggregator;
-import org.carbondata.query.aggregator.impl.DummyDoubleAggregator;
-import org.carbondata.query.aggregator.impl.DummyLongAggregator;
-import org.carbondata.query.aggregator.impl.MaxAggregator;
-import org.carbondata.query.aggregator.impl.MaxBigDecimalAggregator;
-import org.carbondata.query.aggregator.impl.MaxLongAggregator;
-import org.carbondata.query.aggregator.impl.MinAggregator;
-import org.carbondata.query.aggregator.impl.MinBigDecimalAggregator;
-import org.carbondata.query.aggregator.impl.MinLongAggregator;
-import org.carbondata.query.aggregator.impl.SumBigDecimalAggregator;
-import org.carbondata.query.aggregator.impl.SumDistinctBigDecimalAggregator;
-import org.carbondata.query.aggregator.impl.SumDistinctDoubleAggregator;
-import org.carbondata.query.aggregator.impl.SumDistinctLongAggregator;
-import org.carbondata.query.aggregator.impl.SumDoubleAggregator;
-import org.carbondata.query.aggregator.impl.SumLongAggregator;
+import org.carbondata.query.aggregator.impl.avg.AvgBigDecimalAggregator;
+import org.carbondata.query.aggregator.impl.avg.AvgDoubleAggregator;
+import org.carbondata.query.aggregator.impl.avg.AvgLongAggregator;
+import org.carbondata.query.aggregator.impl.count.CountAggregator;
+import org.carbondata.query.aggregator.impl.distinct.DistinctCountAggregatorObjectSet;
+import org.carbondata.query.aggregator.impl.distinct.DistinctCountBigDecimalAggregatorObjectSet;
+import org.carbondata.query.aggregator.impl.distinct.DistinctCountLongAggregatorObjectSet;
+import org.carbondata.query.aggregator.impl.distinct.SumDistinctBigDecimalAggregator;
+import org.carbondata.query.aggregator.impl.distinct.SumDistinctDoubleAggregator;
+import org.carbondata.query.aggregator.impl.distinct.SumDistinctLongAggregator;
+import org.carbondata.query.aggregator.impl.dummy.DummyBigDecimalAggregator;
+import org.carbondata.query.aggregator.impl.dummy.DummyDoubleAggregator;
+import org.carbondata.query.aggregator.impl.dummy.DummyLongAggregator;
+import org.carbondata.query.aggregator.impl.max.MaxAggregator;
+import org.carbondata.query.aggregator.impl.max.MaxBigDecimalAggregator;
+import org.carbondata.query.aggregator.impl.max.MaxLongAggregator;
+import org.carbondata.query.aggregator.impl.min.MinAggregator;
+import org.carbondata.query.aggregator.impl.min.MinBigDecimalAggregator;
+import org.carbondata.query.aggregator.impl.min.MinLongAggregator;
+import org.carbondata.query.aggregator.impl.sum.SumBigDecimalAggregator;
+import org.carbondata.query.aggregator.impl.sum.SumDoubleAggregator;
+import org.carbondata.query.aggregator.impl.sum.SumLongAggregator;
import org.carbondata.query.carbon.model.CustomAggregateExpression;
/**
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/carbon/aggregator/dimension/impl/DirectDictionaryDimensionAggregator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/aggregator/dimension/impl/DirectDictionaryDimensionAggregator.java b/core/src/main/java/org/carbondata/query/carbon/aggregator/dimension/impl/DirectDictionaryDimensionAggregator.java
new file mode 100644
index 0000000..971e4cc
--- /dev/null
+++ b/core/src/main/java/org/carbondata/query/carbon/aggregator/dimension/impl/DirectDictionaryDimensionAggregator.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.carbondata.query.carbon.aggregator.dimension.impl;
+
+import java.nio.ByteBuffer;
+
+import org.carbondata.core.constants.CarbonCommonConstants;
+import org.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
+import org.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
+import org.carbondata.core.util.CarbonUtil;
+import org.carbondata.query.aggregator.MeasureAggregator;
+import org.carbondata.query.carbon.aggregator.dimension.DimensionDataAggregator;
+import org.carbondata.query.carbon.executor.util.QueryUtil;
+import org.carbondata.query.carbon.model.DimensionAggregatorInfo;
+import org.carbondata.query.carbon.result.AbstractScannedResult;
+
+/**
+ * Class which will be used to aggregate the direct dictionary dimension data
+ */
+public class DirectDictionaryDimensionAggregator implements DimensionDataAggregator {
+
+ /**
+ * info object which store information about dimension to be aggregated
+ */
+ private DimensionAggregatorInfo dimensionAggeragtorInfo;
+
+ /**
+ * start index of the aggregator for current dimension column
+ */
+ private int aggregatorStartIndex;
+
+ /**
+ * buffer used to convert mdkey to surrogate key
+ */
+ private ByteBuffer buffer;
+
+ /**
+ * data index in the file
+ */
+ private int blockIndex;
+
+ /**
+ * to store index which will be used to aggregate
+ * number type value like sum avg
+ */
+ private int[] numberTypeAggregatorIndex;
+
+ /**
+ * DirectDictionaryGenerator
+ */
+ private DirectDictionaryGenerator directDictionaryGenerator;
+
+ /**
+ * to store index which will be used to aggregate
+ * actual type value like max, min, dictinct count
+ */
+ private int[] actualTypeAggregatorIndex;
+
+ public DirectDictionaryDimensionAggregator(DimensionAggregatorInfo dimensionAggeragtorInfo,
+ int aggregatorStartIndex, int blockIndex) {
+ this.dimensionAggeragtorInfo = dimensionAggeragtorInfo;
+ this.aggregatorStartIndex = aggregatorStartIndex;
+ this.blockIndex = blockIndex;
+ buffer = ByteBuffer.allocate(CarbonCommonConstants.INT_SIZE_IN_BYTE);
+ numberTypeAggregatorIndex =
+ QueryUtil.getNumberTypeIndex(this.dimensionAggeragtorInfo.getAggList());
+ actualTypeAggregatorIndex =
+ QueryUtil.getActualTypeIndex(this.dimensionAggeragtorInfo.getAggList());
+ directDictionaryGenerator = DirectDictionaryKeyGeneratorFactory
+ .getDirectDictionaryGenerator(this.dimensionAggeragtorInfo.getDim().getDataType());
+ }
+
+ /**
+ * Below method will be used to aggregate the dimension data
+ *
+ * @param scannedResult scanned result
+ * @param aggeragtor aggregator used to aggregate the data
+ */
+ @Override public void aggregateDimensionData(AbstractScannedResult scannedResult,
+ MeasureAggregator[] aggeragtor) {
+ byte[] dimensionData = scannedResult.getDimensionKey(blockIndex);
+ int surrogateKey = CarbonUtil.getSurrogateKey(dimensionData, buffer);
+ Object dataBasedOnDataType =
+ (long) directDictionaryGenerator.getValueFromSurrogate(surrogateKey) / 1000;
+
+ if (actualTypeAggregatorIndex.length > 0) {
+ for (int j = 0; j < actualTypeAggregatorIndex.length; j++) {
+ aggeragtor[aggregatorStartIndex + actualTypeAggregatorIndex[j]].agg(dataBasedOnDataType);
+ }
+ }
+ if (numberTypeAggregatorIndex.length > 0) {
+ for (int j = 0; j < numberTypeAggregatorIndex.length; j++) {
+ aggeragtor[aggregatorStartIndex + numberTypeAggregatorIndex[j]].agg(dataBasedOnDataType);
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/carbon/executor/internal/impl/InternalCountStartQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/executor/internal/impl/InternalCountStartQueryExecutor.java b/core/src/main/java/org/carbondata/query/carbon/executor/internal/impl/InternalCountStartQueryExecutor.java
index e13aee2..14c336d 100644
--- a/core/src/main/java/org/carbondata/query/carbon/executor/internal/impl/InternalCountStartQueryExecutor.java
+++ b/core/src/main/java/org/carbondata/query/carbon/executor/internal/impl/InternalCountStartQueryExecutor.java
@@ -24,7 +24,7 @@ import java.util.List;
import org.carbondata.core.carbon.datastore.block.AbstractIndex;
import org.carbondata.core.iterator.CarbonIterator;
import org.carbondata.query.aggregator.MeasureAggregator;
-import org.carbondata.query.aggregator.impl.CountAggregator;
+import org.carbondata.query.aggregator.impl.count.CountAggregator;
import org.carbondata.query.carbon.executor.exception.QueryExecutionException;
import org.carbondata.query.carbon.executor.infos.BlockExecutionInfo;
import org.carbondata.query.carbon.executor.internal.InternalQueryExecutor;
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/carbon/executor/util/QueryUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/executor/util/QueryUtil.java b/core/src/main/java/org/carbondata/query/carbon/executor/util/QueryUtil.java
index ebd90f9..789f77e 100644
--- a/core/src/main/java/org/carbondata/query/carbon/executor/util/QueryUtil.java
+++ b/core/src/main/java/org/carbondata/query/carbon/executor/util/QueryUtil.java
@@ -53,6 +53,7 @@ import org.carbondata.core.util.CarbonUtil;
import org.carbondata.core.util.CarbonUtilException;
import org.carbondata.query.carbon.aggregator.dimension.DimensionDataAggregator;
import org.carbondata.query.carbon.aggregator.dimension.impl.ColumnGroupDimensionsAggregator;
+import org.carbondata.query.carbon.aggregator.dimension.impl.DirectDictionaryDimensionAggregator;
import org.carbondata.query.carbon.aggregator.dimension.impl.FixedLengthDimensionAggregator;
import org.carbondata.query.carbon.aggregator.dimension.impl.VariableLengthDimensionAggregator;
import org.carbondata.query.carbon.executor.exception.QueryExecutionException;
@@ -724,9 +725,15 @@ public class QueryUtil {
aggregatorStartIndex += numberOfAggregatorForColumnGroup;
continue;
} else {
+ if(CarbonUtil.hasEncoding(dim.getEncoder(), Encoding.DIRECT_DICTIONARY)){
+ dimensionDataAggregators.add(
+ new DirectDictionaryDimensionAggregator(entry.getValue().get(0),
+ aggregatorStartIndex,
+ dimensionToBlockIndexMapping.get(dim.getOrdinal())));
+ }
// if it is a dictionary column than create a fixed length
// aggeragtor
- if (CarbonUtil
+ else if (CarbonUtil
.hasEncoding(dim.getEncoder(), Encoding.DICTIONARY)) {
dimensionDataAggregators.add(
new FixedLengthDimensionAggregator(entry.getValue().get(0), null,
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/carbon/result/preparator/impl/QueryResultPreparatorImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/result/preparator/impl/QueryResultPreparatorImpl.java b/core/src/main/java/org/carbondata/query/carbon/result/preparator/impl/QueryResultPreparatorImpl.java
index 0957e7a..5604ecd 100644
--- a/core/src/main/java/org/carbondata/query/carbon/result/preparator/impl/QueryResultPreparatorImpl.java
+++ b/core/src/main/java/org/carbondata/query/carbon/result/preparator/impl/QueryResultPreparatorImpl.java
@@ -32,9 +32,9 @@ import org.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerat
import org.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
import org.carbondata.core.util.CarbonUtil;
import org.carbondata.query.aggregator.MeasureAggregator;
-import org.carbondata.query.aggregator.impl.CountAggregator;
-import org.carbondata.query.aggregator.impl.DistinctCountAggregator;
-import org.carbondata.query.aggregator.impl.DistinctStringCountAggregator;
+import org.carbondata.query.aggregator.impl.count.CountAggregator;
+import org.carbondata.query.aggregator.impl.distinct.DistinctCountAggregator;
+import org.carbondata.query.aggregator.impl.distinct.DistinctStringCountAggregator;
import org.carbondata.query.carbon.executor.impl.QueryExecutorProperties;
import org.carbondata.query.carbon.model.DimensionAggregatorInfo;
import org.carbondata.query.carbon.model.QueryDimension;
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/examples/src/main/scala/org/carbondata/examples/GenerateDictionaryExample.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/carbondata/examples/GenerateDictionaryExample.scala b/examples/src/main/scala/org/carbondata/examples/GenerateDictionaryExample.scala
index 69e1d2f..a549409 100644
--- a/examples/src/main/scala/org/carbondata/examples/GenerateDictionaryExample.scala
+++ b/examples/src/main/scala/org/carbondata/examples/GenerateDictionaryExample.scala
@@ -63,7 +63,7 @@ object GenerateDictionaryExample {
val tableName = carbonTableIdentifier.getTableName
val carbonRelation = CarbonEnv.getInstance(carbonContext).carbonCatalog.
lookupRelation1(Option(dataBaseName),
- tableName, None) (carbonContext).asInstanceOf[CarbonRelation]
+ tableName) (carbonContext).asInstanceOf[CarbonRelation]
val carbonTable = carbonRelation.cubeMeta.carbonTable
val dimensions = carbonTable.getDimensionByTableName(tableName)
.toArray.map(_.asInstanceOf[CarbonDimension])
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration-testcases/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
----------------------------------------------------------------------
diff --git a/integration-testcases/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala b/integration-testcases/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
index 1cd4be4..c4f09cc 100644
--- a/integration-testcases/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
+++ b/integration-testcases/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
@@ -19,12 +19,13 @@ package org.apache.spark.sql.common.util
import java.util.{Locale, TimeZone}
-import org.apache.spark.sql.{DataFrame, Row, SQLContext}
+import scala.collection.JavaConversions._
+
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.columnar.InMemoryRelation
+import org.apache.spark.sql.execution.columnar.InMemoryRelation
+import org.apache.spark.sql.{DataFrame, Row, SQLContext}
-import scala.collection.JavaConversions._
class QueryTest extends PlanTest {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration-testcases/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCase1.scala
----------------------------------------------------------------------
diff --git a/integration-testcases/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCase1.scala b/integration-testcases/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCase1.scala
index ec23b80..bd7b596 100644
--- a/integration-testcases/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCase1.scala
+++ b/integration-testcases/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCase1.scala
@@ -81,6 +81,34 @@ class AllDataTypesTestCase1 extends QueryTest with BeforeAndAfterAll {
"Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber," +
"Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId," +
"gamePointId,gamePointDescription')")
+
+ sql(
+ "create table if not exists Carbon_automation_hive (imei string,deviceInformationId int," +
+ "MAC string,deviceColor string,device_backColor string,modelId string,marketName " +
+ "string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string," +
+ "productionDate timestamp,bomCode string,internalModels string, deliveryTime string, " +
+ "channelsId string, channelsName string , deliveryAreaId string, deliveryCountry " +
+ "string, deliveryProvince string, deliveryCity string,deliveryDistrict string, " +
+ "deliveryStreet string, oxSingleNumber string,contractNumber int, ActiveCheckTime string, ActiveAreaId " +
+ "string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict" +
+ " string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, " +
+ "Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, " +
+ "Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string," +
+ "Active_webTypeDataVerNumber string, Active_operatorsVersion string, " +
+ "Active_phonePADPartitionedVersions string, Latest_YEAR int, Latest_MONTH int, " +
+ "Latest_DAY int, Latest_HOUR string, Latest_areaId string, Latest_country string, " +
+ "Latest_province string, Latest_city string, Latest_district string, Latest_street " +
+ "string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion " +
+ "string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion " +
+ "string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, " +
+ "Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, " +
+ "Latest_operatorId string, , gamePointId int, gamePointDescription string" +
+ ") row format delimited fields terminated by ','"
+ )
+
+ sql("LOAD DATA LOCAL INPATH '" + currentDirectory + "/src/test/resources/100_olap.csv' INTO " +
+ "table Carbon_automation_hive ")
+
} catch {
case e: Exception => print("ERROR: DROP Carbon_automation_test ")
}
@@ -88,6 +116,7 @@ class AllDataTypesTestCase1 extends QueryTest with BeforeAndAfterAll {
override def afterAll {
sql("drop cube Carbon_automation_test")
+ sql("drop table Carbon_automation_hive")
}
@@ -853,7 +882,7 @@ class AllDataTypesTestCase1 extends QueryTest with BeforeAndAfterAll {
test("select variance(deviceInformationId) as a from Carbon_automation_test")({
checkAnswer(
sql("select variance(deviceInformationId) as a from Carbon_automation_test"),
- Seq(Row(9.31041555963636E9))
+ sql("select variance(deviceInformationId) as a from Carbon_automation_hive")
)
}
)
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration-testcases/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCase2.scala
----------------------------------------------------------------------
diff --git a/integration-testcases/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCase2.scala b/integration-testcases/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCase2.scala
index ab9121a..88ba722 100644
--- a/integration-testcases/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCase2.scala
+++ b/integration-testcases/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCase2.scala
@@ -81,6 +81,32 @@ class AllDataTypesTestCase2 extends QueryTest with BeforeAndAfterAll {
"Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber," +
"Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId," +
"gamePointId,gamePointDescription')")
+
+ sql(
+ "create table if not exists Carbon_automation_hive2(imei string,deviceInformationId int," +
+ "MAC string,deviceColor string,device_backColor string,modelId string,marketName " +
+ "string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string," +
+ "productionDate timestamp,bomCode string,internalModels string, deliveryTime string, " +
+ "channelsId string, channelsName string , deliveryAreaId string, deliveryCountry " +
+ "string, deliveryProvince string, deliveryCity string,deliveryDistrict string, " +
+ "deliveryStreet string, oxSingleNumber string,contractNumber int, ActiveCheckTime string, ActiveAreaId " +
+ "string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict" +
+ " string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, " +
+ "Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, " +
+ "Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string," +
+ "Active_webTypeDataVerNumber string, Active_operatorsVersion string, " +
+ "Active_phonePADPartitionedVersions string, Latest_YEAR int, Latest_MONTH int, " +
+ "Latest_DAY int, Latest_HOUR string, Latest_areaId string, Latest_country string, " +
+ "Latest_province string, Latest_city string, Latest_district string, Latest_street " +
+ "string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion " +
+ "string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion " +
+ "string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, " +
+ "Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, " +
+ "Latest_operatorId string, gamePointId int,gamePointDescription string" +
+ ") row format delimited fields terminated by ','"
+ )
+ sql("LOAD DATA LOCAL INPATH '" + currentDirectory + "/src/test/resources/100_olap.csv' INTO " +
+ "table Carbon_automation_hive2 ")
} catch {
case e: Exception => print("ERROR: DROP Carbon_automation_test2 ")
}
@@ -89,6 +115,7 @@ class AllDataTypesTestCase2 extends QueryTest with BeforeAndAfterAll {
override def afterAll {
try {
sql("drop cube Carbon_automation_test2")
+ sql("drop table Carbon_automation_hive2")
} catch {
case e: Exception => print("ERROR: DROP Carbon_automation_test2 ")
}
@@ -7902,7 +7929,9 @@ class AllDataTypesTestCase2 extends QueryTest with BeforeAndAfterAll {
sql(
"select variance(deviceInformationId), var_pop(imei) from Carbon_automation_test2 where activeareaid>3"
),
- Seq(Row(1.477644655616972E10, null))
+ sql(
+ "select variance(deviceInformationId), var_pop(imei) from Carbon_automation_hive2 where activeareaid>3"
+ )
)
}
)
@@ -7915,7 +7944,9 @@ class AllDataTypesTestCase2 extends QueryTest with BeforeAndAfterAll {
sql(
"select variance(contractNumber), var_pop(contractNumber) from Carbon_automation_test2 where deliveryareaid>5"
),
- Seq(Row(8.508651970169495E12, 8.508651970169495E12))
+ sql(
+ "select variance(contractNumber), var_pop(contractNumber) from Carbon_automation_hive2 where deliveryareaid>5"
+ )
)
}
)
@@ -7928,7 +7959,9 @@ class AllDataTypesTestCase2 extends QueryTest with BeforeAndAfterAll {
sql(
"select variance(AMSize), var_pop(channelsid) from Carbon_automation_test2 where channelsid>2"
),
- Seq(Row(null, 2.148423005565863))
+ sql(
+ "select variance(AMSize), var_pop(channelsid) from Carbon_automation_hive2 where channelsid>2"
+ )
)
}
)
@@ -7941,7 +7974,9 @@ class AllDataTypesTestCase2 extends QueryTest with BeforeAndAfterAll {
sql(
"select variance(deviceInformationId), var_pop(deviceInformationId) from Carbon_automation_test2 where activeareaid>3"
),
- Seq(Row(1.477644655616972E10, 1.477644655616972E10))
+ sql(
+ "select variance(deviceInformationId), var_pop(deviceInformationId) from Carbon_automation_hive2 where activeareaid>3"
+ )
)
}
)
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration-testcases/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCase6.scala
----------------------------------------------------------------------
diff --git a/integration-testcases/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCase6.scala b/integration-testcases/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCase6.scala
index 1884e5f..12f55b7 100644
--- a/integration-testcases/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCase6.scala
+++ b/integration-testcases/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCase6.scala
@@ -53,7 +53,7 @@ class AllDataTypesTestCase6 extends QueryTest with BeforeAndAfterAll {
"bomCode string,internalModels string, deliveryTime string, channelsId string, " +
"channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince " +
"string, deliveryCity string,deliveryDistrict string, deliveryStreet string, " +
- "oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry " +
+ "oxSingleNumber string,contractNumber int, ActiveCheckTime string, ActiveAreaId string, ActiveCountry " +
"string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet " +
"string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, " +
"Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, " +
@@ -65,8 +65,8 @@ class AllDataTypesTestCase6 extends QueryTest with BeforeAndAfterAll {
"Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, " +
"Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string," +
" Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, " +
- "Latest_phonePADPartitionedVersions string, Latest_operatorId string, " +
- "gamePointDescription string, gamePointId int,contractNumber int) row format " +
+ "Latest_phonePADPartitionedVersions string, Latest_operatorId string,gamePointId int," +
+ "gamePointDescription string) row format " +
"delimited fields terminated by ','"
)
@@ -211,7 +211,7 @@ class AllDataTypesTestCase6 extends QueryTest with BeforeAndAfterAll {
test("select sum( DISTINCT channelsId) a from Carbon_automation_test6")({
checkAnswer(
sql("select sum( DISTINCT channelsId) a from Carbon_automation_test6"),
- Seq(Row(428.0)))
+ sql("select sum( DISTINCT channelsId) a from hivetable"))
})
//TC_083
@@ -263,7 +263,7 @@ class AllDataTypesTestCase6 extends QueryTest with BeforeAndAfterAll {
test("select variance(gamePointId) as a from Carbon_automation_test6")({
checkAnswer(
sql("select variance(gamePointId) as a from Carbon_automation_test6"),
- Seq(Row(654787.843930927)))
+ sql("select variance(gamePointId) as a from hivetable"))
})
//TC_120
@@ -732,14 +732,14 @@ class AllDataTypesTestCase6 extends QueryTest with BeforeAndAfterAll {
test("select variance(gamepointid), var_pop(gamepointid) from Carbon_automation_test6 where channelsid>2")({
checkAnswer(
sql("select variance(gamepointid), var_pop(gamepointid) from Carbon_automation_test6 where channelsid>2"),
- Seq(Row(622630.4599570761, 622630.4599570761)))
+ sql("select variance(gamepointid), var_pop(gamepointid) from hivetable where channelsid>2"))
})
//TC_445
test("select variance(bomcode), var_pop(gamepointid) from Carbon_automation_test6 where activeareaid>3")({
checkAnswer(
sql("select variance(bomcode), var_pop(gamepointid) from Carbon_automation_test6 where activeareaid>3"),
- Seq(Row(1.4776446556169722E10, 663683.3954750763)))
+ sql("select variance(bomcode), var_pop(gamepointid) from hivetable where activeareaid>3"))
})
//TC_447
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration/spark/pom.xml
----------------------------------------------------------------------
diff --git a/integration/spark/pom.xml b/integration/spark/pom.xml
index 0682a42..c98c9fb 100644
--- a/integration/spark/pom.xml
+++ b/integration/spark/pom.xml
@@ -65,17 +65,6 @@
<version>${project.version}</version>
</dependency>
<dependency>
- <groupId>eigenbase</groupId>
- <artifactId>eigenbase-xom</artifactId>
- <version>1.3.4</version>
- <exclusions>
- <exclusion>
- <groupId>*</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
<groupId>it.unimi.dsi</groupId>
<artifactId>fastutil</artifactId>
<version>6.5.0</version>
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonAggregate.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonAggregate.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonAggregate.scala
deleted file mode 100644
index 93cf675..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonAggregate.scala
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import java.util.HashMap
-
-import scala.Array.{canBuildFrom, fallbackCanBuildFrom}
-
-import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.errors.attachTree
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.physical._
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.execution.{SparkPlan, UnaryNode}
-
-/**
- * :: DeveloperApi ::
- * Groups input data by `groupingExpressions` and computes the `aggregateExpressions` for each
- * group.
- *
- * @param partial if true then aggregation is done partially on local data without
- * shuffling to
- * ensure all values where `groupingExpressions` are equal are present.
- * @param groupingExpressions expressions that are evaluated to determine grouping.
- * @param aggregateExpressions expressions that are computed for each group.
- * @param child the input data source.
- */
-@DeveloperApi
-case class CarbonAggregate(
- partial: Boolean,
- groupingExpressions: Seq[Expression],
- aggregateExpressions: Seq[NamedExpression],
- child: SparkPlan)(@transient sqlContext: SQLContext)
- extends UnaryNode {
-
- override def requiredChildDistribution: Seq[Distribution] = {
- if (partial) {
- UnspecifiedDistribution :: Nil
- } else {
- if (groupingExpressions == Nil) {
- AllTuples :: Nil
- } else {
- ClusteredDistribution(groupingExpressions) :: Nil
- }
- }
- }
-
- override def otherCopyArgs: Seq[AnyRef] = sqlContext :: Nil
-
- // HACK: Generators don't correctly preserve their output through serializations so we grab
- // out child's output attributes statically here.
- private[this] val childOutput = child.output
-
- override def output: Seq[Attribute] = aggregateExpressions.map(_.toAttribute)
-
- /**
- * An aggregate that needs to be computed for each row in a group.
- *
- * @param unbound Unbound version of this aggregate, used for result substitution.
- * @param aggregate A bound copy of this aggregate used to create a new aggregation buffer.
- * @param resultAttribute An attribute used to refer to the result of this aggregate in the final
- * output.
- */
- case class ComputedAggregate(unbound: AggregateExpression1,
- aggregate: AggregateExpression1,
- resultAttribute: AttributeReference)
-
- /** A list of aggregates that need to be computed for each group. */
- private[this] val computedAggregates = aggregateExpressions.flatMap { agg =>
- agg.collect {
- case a: AggregateExpression1 =>
- ComputedAggregate(
- a,
- BindReferences.bindReference(a, childOutput),
- AttributeReference(s"aggResult:$a", a.dataType, a.nullable)())
- }
- }.toArray
-
- /** The schema of the result of all aggregate evaluations */
- private[this] val computedSchema = computedAggregates.map(_.resultAttribute)
-
- /** Creates a new aggregate buffer for a group. */
- private[this] def newAggregateBuffer(): Array[AggregateFunction1] = {
- val buffer = new Array[AggregateFunction1](computedAggregates.length)
- var i = 0
- while (i < computedAggregates.length) {
- buffer(i) = computedAggregates(i).aggregate.newInstance()
- i += 1
- }
- buffer
- }
-
- /** Named attributes used to substitute grouping attributes into the final result. */
- private[this] val namedGroups = groupingExpressions.map {
- case ne: NamedExpression => ne -> ne.toAttribute
- case e => e -> Alias(e, s"groupingExpr:$e")().toAttribute
- }
-
- /**
- * A map of substitutions that are used to insert the aggregate expressions and grouping
- * expression into the final result expression.
- */
- private[this] val resultMap =
- (computedAggregates.map { agg => agg.unbound -> agg.resultAttribute } ++ namedGroups).toMap
-
- /**
- * Substituted version of aggregateExpressions expressions which are used to compute final
- * output rows given a group and the result of all aggregate computations.
- */
- private[this] val resultExpressions = aggregateExpressions.map { agg =>
- agg.transform {
- case e: Expression if resultMap.contains(e) => resultMap(e)
- }
- }
-
- override def doExecute(): RDD[InternalRow] = {
- attachTree(this, "execute") {
- if (groupingExpressions.isEmpty) {
- child.execute().mapPartitions { iter =>
- val buffer = newAggregateBuffer()
- var currentRow: InternalRow = null
- while (iter.hasNext) {
- currentRow = iter.next()
- var i = 0
- while (i < buffer.length) {
- buffer(i).update(currentRow)
- i += 1
- }
- }
- val resultProjection = new InterpretedProjection(resultExpressions, computedSchema)
- val aggregateResults = new GenericMutableRow(computedAggregates.length)
-
- var i = 0
- while (i < buffer.length) {
- aggregateResults(i) = buffer(i).eval(EmptyRow)
- i += 1
- }
-
- Iterator(resultProjection(aggregateResults))
- }
- } else {
- child.execute().mapPartitions { iter =>
- val hashTable = new HashMap[InternalRow, Array[AggregateFunction1]]
- val groupingProjection = new InterpretedMutableProjection(groupingExpressions,
- childOutput)
-
- var currentRow: InternalRow = null
- while (iter.hasNext) {
- currentRow = iter.next()
- val currentGroup = groupingProjection(currentRow)
- var currentBuffer = hashTable.get(currentGroup)
- if (currentBuffer == null) {
- currentBuffer = newAggregateBuffer()
- hashTable.put(currentGroup.copy(), currentBuffer)
- }
-
- var i = 0
- while (i < currentBuffer.length) {
- currentBuffer(i).update(currentRow)
- i += 1
- }
- }
-
- new Iterator[InternalRow] {
- private[this] val hashTableIter = hashTable.entrySet().iterator()
- private[this] val aggregateResults = new GenericMutableRow(computedAggregates.length)
- private[this] val resultProjection =
- new InterpretedMutableProjection(resultExpressions,
- computedSchema ++ namedGroups.map(_._2))
- private[this] val joinedRow = new JoinedRow
-
- override final def hasNext: Boolean = hashTableIter.hasNext
-
- override final def next(): InternalRow = {
- val currentEntry = hashTableIter.next()
- val currentGroup = currentEntry.getKey
- val currentBuffer = currentEntry.getValue
-
- var i = 0
- while (i < currentBuffer.length) {
- // Evaluating an aggregate buffer returns the result. No row is required since we
- // already added all rows in the group using update.
- aggregateResults(i) = currentBuffer(i).eval(EmptyRow)
- i += 1
- }
- resultProjection(joinedRow(aggregateResults, currentGroup))
- }
- }
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
index e574348..f728a32 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
@@ -19,18 +19,18 @@ package org.apache.spark.sql
import scala.collection.mutable.MutableList
-import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.agg.{CarbonAverage, CarbonCount}
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.catalyst.plans.logical.{UnaryNode, _}
-import org.apache.spark.sql.catalyst.trees.TreeNodeRef
import org.apache.spark.sql.execution.command.tableModel
+import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.optimizer.{CarbonAliasDecoderRelation, CarbonDecoderRelation}
-import org.apache.spark.sql.types.{BooleanType, DataType, StringType, TimestampType}
-
-import org.carbondata.spark.agg._
+import org.apache.spark.sql.types._
/**
* Top command
@@ -94,7 +94,7 @@ case class ShowCubeCommand(schemaNameOp: Option[String]) extends LogicalPlan wit
override def children: Seq[LogicalPlan] = Seq.empty
override def output: Seq[Attribute] = {
- Seq(AttributeReference("cubeName", StringType, nullable = false)(),
+ Seq(AttributeReference("tableName", StringType, nullable = false)(),
AttributeReference("isRegisteredWithSpark", BooleanType, nullable = false)())
}
}
@@ -107,8 +107,8 @@ case class ShowAllCubeCommand() extends LogicalPlan with Command {
override def children: Seq[LogicalPlan] = Seq.empty
override def output: Seq[Attribute] = {
- Seq(AttributeReference("schemaName", StringType, nullable = false)(),
- AttributeReference("cubeName", StringType, nullable = false)(),
+ Seq(AttributeReference("dbName", StringType, nullable = false)(),
+ AttributeReference("tableName", StringType, nullable = false)(),
AttributeReference("isRegisteredWithSpark", BooleanType, nullable = false)())
}
}
@@ -161,7 +161,7 @@ case class ShowLoadsCommand(schemaNameOp: Option[String], cube: String, limit: O
/**
* Describe formatted for hive table
*/
-case class DescribeFormattedCommand(sql: String, tblIdentifier: Seq[String])
+case class DescribeFormattedCommand(sql: String, tblIdentifier: TableIdentifier)
extends LogicalPlan with Command {
override def children: Seq[LogicalPlan] = Seq.empty
@@ -181,16 +181,18 @@ case class CarbonDictionaryCatalystDecoder(
override def output: Seq[Attribute] = child.output
}
-abstract class CarbonProfile(attributes: Seq[Attribute]) extends Serializable{
+abstract class CarbonProfile(attributes: Seq[Attribute]) extends Serializable {
def isEmpty: Boolean = attributes.isEmpty
}
+
case class IncludeProfile(attributes: Seq[Attribute]) extends CarbonProfile(attributes)
+
case class ExcludeProfile(attributes: Seq[Attribute]) extends CarbonProfile(attributes)
case class FakeCarbonCast(child: Literal, dataType: DataType)
extends LeafExpression with CodegenFallback {
- override def toString: String = s"FakeCarbonCast($child as ${dataType.simpleString})"
+ override def toString: String = s"FakeCarbonCast($child as ${ dataType.simpleString })"
override def checkInputDataTypes(): TypeCheckResult = {
TypeCheckResult.TypeCheckSuccess
@@ -255,7 +257,7 @@ object PhysicalOperation1 extends PredicateHelper {
val (fields, filters, other, aliases, _, sortOrder, limit) = collectProjectsAndFilters(
child)
- var aggExps: Seq[AggregateExpression1] = Nil
+ var aggExps: Seq[AggregateExpression] = Nil
aggregateExpressions.foreach(v => {
val list = findAggreagateExpression(v)
aggExps = aggExps ++ list
@@ -276,12 +278,12 @@ object PhysicalOperation1 extends PredicateHelper {
}
}
- def findAggreagateExpression(expr: Expression): Seq[AggregateExpression1] = {
+ def findAggreagateExpression(expr: Expression): Seq[AggregateExpression] = {
val exprList = expr match {
- case d: AggregateExpression1 => d :: Nil
+ case d: AggregateExpression => d :: Nil
case Alias(ref, name) => findAggreagateExpression(ref)
case other =>
- var listout: Seq[AggregateExpression1] = Nil
+ var listout: Seq[AggregateExpression] = Nil
other.children.foreach(v => {
val list = findAggreagateExpression(v)
@@ -317,7 +319,7 @@ object PhysicalOperation1 extends PredicateHelper {
case Alias(ref, name) => ref
case others => others
}.filter {
- case d: AggregateExpression1 => true
+ case d: AggregateExpression => true
case _ => false
}
(fields, filters, other, aliases ++ collectAliases(aggregateExpressions), Some(
@@ -352,6 +354,28 @@ object PhysicalOperation1 extends PredicateHelper {
}
}
+case class PositionLiteral(expr: Expression, intermediateDataType: DataType)
+ extends LeafExpression with CodegenFallback {
+ override def dataType: DataType = expr.dataType
+
+ override def nullable: Boolean = false
+
+ type EvaluatedType = Any
+ var position = -1
+
+ def setPosition(pos: Int): Unit = position = pos
+
+ override def toString: String = s"PositionLiteral($position : $expr)"
+
+ override def eval(input: InternalRow): Any = {
+ if (position != -1) {
+ input.get(position, intermediateDataType)
+ } else {
+ expr.eval(input)
+ }
+ }
+}
+
/**
* Matches a logical aggregation that can be performed on distributed data in two steps. The first
* operates on the data in each partition performing partial aggregation for each group. The second
@@ -367,85 +391,98 @@ object PhysicalOperation1 extends PredicateHelper {
* - Partial aggregate expressions.
* - Input to the aggregation.
*/
-object PartialAggregation {
- type ReturnType =
- (Seq[Attribute], Seq[NamedExpression], Seq[Expression], Seq[NamedExpression], LogicalPlan)
+object CarbonAggregation {
+ type ReturnType = (Seq[Expression], Seq[NamedExpression], LogicalPlan)
private def convertAggregatesForPushdown(convertUnknown: Boolean,
- rewrittenAggregateExpressions: Seq[Expression]) = {
- var counter: Int = 0
- var updatedExpressions = MutableList[Expression]()
- rewrittenAggregateExpressions.foreach(v => {
- val updated = convertAggregate(v, counter, convertUnknown)
- updatedExpressions += updated
- counter = counter + 1
- })
- updatedExpressions
+ rewrittenAggregateExpressions: Seq[Expression],
+ oneAttr: AttributeReference) = {
+ if (canBeConvertedToCarbonAggregate(rewrittenAggregateExpressions)) {
+ var counter: Int = 0
+ var updatedExpressions = MutableList[Expression]()
+ rewrittenAggregateExpressions.foreach(v => {
+ val updated = convertAggregate(v, counter, convertUnknown, oneAttr)
+ updatedExpressions += updated
+ counter = counter + 1
+ })
+ updatedExpressions
+ } else {
+ rewrittenAggregateExpressions
+ }
}
- def makePositionLiteral(expr: Expression, index: Int): PositionLiteral = {
- val posLiteral = PositionLiteral(expr, MeasureAggregatorUDT)
+ def makePositionLiteral(expr: Expression, index: Int, dataType: DataType): PositionLiteral = {
+ val posLiteral = PositionLiteral(expr, dataType)
posLiteral.setPosition(index)
posLiteral
}
- def convertAggregate(current: Expression, index: Int, convertUnknown: Boolean): Expression = {
- if (convertUnknown) {
+ def convertAggregate(current: Expression,
+ index: Int,
+ convertUnknown: Boolean,
+ oneAttr: AttributeReference): Expression = {
+ if (!convertUnknown && canBeConverted(current)) {
current.transform {
- case a@SumCarbon(_, _) => a
- case a@AverageCarbon(_, _) => a
- case a@MinCarbon(_, _) => a
- case a@MaxCarbon(_, _) => a
- case a@SumDistinctCarbon(_, _) => a
- case a@CountDistinctCarbon(_) => a
- case a@CountCarbon(_) => a
- case anyAggr: AggregateExpression1 => anyAggr
+ case Average(attr: AttributeReference) =>
+ val convertedDataType = transformArrayType(attr)
+ CarbonAverage(makePositionLiteral(convertedDataType, index, convertedDataType.dataType))
+ case Average(Cast(attr: AttributeReference, dataType)) =>
+ val convertedDataType = transformArrayType(attr)
+ CarbonAverage(
+ makePositionLiteral(convertedDataType, index, convertedDataType.dataType))
+ case Count(Seq(s: Literal)) =>
+ CarbonCount(s, Some(makePositionLiteral(transformLongType(oneAttr), index, LongType)))
+ case Count(Seq(attr: AttributeReference)) =>
+ CarbonCount(makePositionLiteral(transformLongType(attr), index, LongType))
+ case Sum(attr: AttributeReference) =>
+ Sum(makePositionLiteral(attr, index, attr.dataType))
+ case Sum(Cast(attr: AttributeReference, dataType)) =>
+ Sum(Cast(makePositionLiteral(attr, index, attr.dataType), dataType))
+ case Min(attr: AttributeReference) => Min(makePositionLiteral(attr, index, attr.dataType))
+ case Min(Cast(attr: AttributeReference, dataType)) =>
+ Min(Cast(makePositionLiteral(attr, index, attr.dataType), dataType))
+ case Max(attr: AttributeReference) =>
+ Max(makePositionLiteral(attr, index, attr.dataType))
+ case Max(Cast(attr: AttributeReference, dataType)) =>
+ Max(Cast(makePositionLiteral(attr, index, attr.dataType), dataType))
}
} else {
- current.transform {
- case a@Sum(attr: AttributeReference) => SumCarbon(makePositionLiteral(attr, index))
- case a@Sum(cast@Cast(attr: AttributeReference, _)) => SumCarbon(
- makePositionLiteral(attr, index), cast.dataType)
- case a@Average(attr: AttributeReference) => AverageCarbon(makePositionLiteral(attr, index))
- case a@Average(cast@Cast(attr: AttributeReference, _)) => AverageCarbon(
- makePositionLiteral(attr, index), cast.dataType)
- case a@Min(attr: AttributeReference) => MinCarbon(makePositionLiteral(attr, index))
- case a@Min(cast@Cast(attr: AttributeReference, _)) => MinCarbon(
- makePositionLiteral(attr, index), cast.dataType)
- case a@Max(attr: AttributeReference) => MaxCarbon(makePositionLiteral(attr, index))
- case a@Max(cast@Cast(attr: AttributeReference, _)) => MaxCarbon(
- makePositionLiteral(attr, index), cast.dataType)
- case a@SumDistinct(attr: AttributeReference) => SumDistinctCarbon(
- makePositionLiteral(attr, index))
- case a@SumDistinct(cast@Cast(attr: AttributeReference, _)) => SumDistinctCarbon(
- makePositionLiteral(attr, index), cast.dataType)
- case a@CountDistinct(attr: AttributeReference) => CountDistinctCarbon(
- makePositionLiteral(attr, index))
- case a@CountDistinct(childSeq) if childSeq.size == 1 =>
- childSeq.head match {
- case attr: AttributeReference => CountDistinctCarbon(makePositionLiteral(attr, index))
- case _ => a
- }
- case a@Count(s@Literal(_, _)) =>
- CountCarbon(makePositionLiteral(s, index))
- case a@Count(attr: AttributeReference) =>
- if (attr.name.equals("*")) {
- CountCarbon(makePositionLiteral(Literal("*"), index))
- } else {
- CountCarbon(makePositionLiteral(attr, index))
- }
- }
+ current
}
}
+ def canBeConverted(current: Expression): Boolean = current match {
+ case Alias(AggregateExpression(Average(attr: AttributeReference), _, false), _) => true
+ case Alias(AggregateExpression(Average(Cast(attr: AttributeReference, _)), _, false), _) => true
+ case Alias(AggregateExpression(Count(Seq(s: Literal)), _, false), _) => true
+ case Alias(AggregateExpression(Count(Seq(attr: AttributeReference)), _, false), _) => true
+ case Alias(AggregateExpression(Sum(attr: AttributeReference), _, false), _) => true
+ case Alias(AggregateExpression(Sum(Cast(attr: AttributeReference, _)), _, false), _) => true
+ case Alias(AggregateExpression(Min(attr: AttributeReference), _, false), _) => true
+ case Alias(AggregateExpression(Min(Cast(attr: AttributeReference, _)), _, false), _) => true
+ case Alias(AggregateExpression(Max(attr: AttributeReference), _, false), _) => true
+ case Alias(AggregateExpression(Max(Cast(attr: AttributeReference, _)), _, false), _) => true
+ case _ => false
+ }
+
+ def transformArrayType(attr: AttributeReference): AttributeReference = {
+ AttributeReference(attr.name, ArrayType(DoubleType), attr.nullable, attr.metadata)(attr.exprId,
+ attr.qualifiers)
+ }
+
+ def transformLongType(attr: AttributeReference): AttributeReference = {
+ AttributeReference(attr.name, LongType, attr.nullable, attr.metadata)(attr.exprId,
+ attr.qualifiers)
+ }
+
/**
* There should be sync between carbonOperators validation and here. we should not convert to
* carbon aggregates if the validation does not satisfy.
*/
- private def canBeConvertedToCarbonAggregate(expressions: Seq[Expression]): Boolean = {
+ def canBeConvertedToCarbonAggregate(expressions: Seq[Expression]): Boolean = {
val detailQuery = expressions.map {
case attr@AttributeReference(_, _, _, _) => true
- case par: Alias if par.children.head.isInstanceOf[AggregateExpression1] => true
+ case Alias(agg: AggregateExpression, name) => true
case _ => false
}.exists(!_)
!detailQuery
@@ -454,6 +491,7 @@ object PartialAggregation {
def unapply(plan: LogicalPlan): Option[ReturnType] = unapply((plan, false))
def unapply(combinedPlan: (LogicalPlan, Boolean)): Option[ReturnType] = {
+ val oneAttr = getOneAttribute(combinedPlan._1)
combinedPlan._1 match {
case Aggregate(groupingExpressions, aggregateExpressionsOrig, child) =>
@@ -463,99 +501,28 @@ object PartialAggregation {
aggregateExpressionsOrig
}
else {
- // First calculate partialComputation before converting and then check whether it could
- // be converted or not. This type of checks are necessary for queries like
- // select sum(col)+10 from table. Here the aggregates are different for
- // partialComputation and aggregateExpressionsOrig. So first check on partialComputation
- val preCheckEval = getPartialEvaluation(groupingExpressions, aggregateExpressionsOrig)
- preCheckEval match {
- case Some(allExprs) =>
- if (canBeConvertedToCarbonAggregate(allExprs._1)) {
- convertAggregatesForPushdown(false, aggregateExpressionsOrig)
- } else {
- aggregateExpressionsOrig
- }
- case _ => aggregateExpressionsOrig
- }
+ convertAggregatesForPushdown(false, aggregateExpressionsOrig, oneAttr)
}
- val evaluation = getPartialEvaluation(groupingExpressions, aggregateExpressions)
-
- evaluation match {
- case(Some((partialComputation,
- rewrittenAggregateExpressions,
- namedGroupingAttributes))) =>
- // Convert the other aggregations for push down to Carbon layer.
- // Here don't touch earlier converted native carbon aggregators.
- val convertedPartialComputation =
- if (combinedPlan._2) {
- partialComputation
- }
- else {
- convertAggregatesForPushdown(true, partialComputation)
- .asInstanceOf[Seq[NamedExpression]]
- }
-
- Some(
- (namedGroupingAttributes,
- rewrittenAggregateExpressions,
- groupingExpressions,
- convertedPartialComputation,
- child))
- case _ => None
- }
-
+ Some((groupingExpressions, aggregateExpressions.asInstanceOf[Seq[NamedExpression]], child))
case _ => None
}
}
- def getPartialEvaluation(groupingExpressions: Seq[Expression],
- aggregateExpressions: Seq[Expression]):
- Option[(Seq[NamedExpression], Seq[NamedExpression], Seq[Attribute])] = {
- // Collect all aggregate expressions.
- val allAggregates =
- aggregateExpressions.flatMap(_ collect { case a: AggregateExpression1 => a })
- // Collect all aggregate expressions that can be computed partially.
- val partialAggregates =
- aggregateExpressions.flatMap(_ collect { case p: PartialAggregate1 => p })
-
- // Only do partial aggregation if supported by all aggregate expressions.
- if (allAggregates.size == partialAggregates.size) {
- // Create a map of expressions to their partial evaluations for all aggregate expressions.
- val partialEvaluations: Map[TreeNodeRef, SplitEvaluation] =
- partialAggregates.map(a => (new TreeNodeRef(a), a.asPartial)).toMap
-
- // We need to pass all grouping expressions though so the grouping can happen a second
- // time. However some of them might be unnamed so we alias them allowing them to be
- // referenced in the second aggregation.
- val namedGroupingExpressions: Map[Expression, NamedExpression] = groupingExpressions.map {
- case n: NamedExpression => (n, n)
- case other => (other, Alias(other, "PartialGroup")())
- }.toMap
-
- // Replace aggregations with a new expression that computes the result from the already
- // computed partial evaluations and grouping values.
- val rewrittenAggregateExpressions = aggregateExpressions.map(_.transformUp {
- case e: Expression if partialEvaluations.contains(new TreeNodeRef(e)) =>
- partialEvaluations(new TreeNodeRef(e)).finalEvaluation
-
- case e: Expression =>
- // Should trim aliases around `GetField`s. These aliases are introduced while
- // resolving struct field accesses, because `GetField` is not a `NamedExpression`.
- // (Should we just turn `GetField` into a `NamedExpression`?)
- namedGroupingExpressions.collectFirst {
- case (expr, ne) if expr semanticEquals e => ne.toAttribute
- }.getOrElse(e)
- }).asInstanceOf[Seq[NamedExpression]]
-
- val partialComputation =
- (namedGroupingExpressions.values ++
- partialEvaluations.values.flatMap(_.partialEvaluations)).toSeq
- val namedGroupingAttributes = namedGroupingExpressions.values.map(_.toAttribute).toSeq
- Some(partialComputation, rewrittenAggregateExpressions, namedGroupingAttributes)
+ def getOneAttribute(plan: LogicalPlan): AttributeReference = {
+ var relation: LogicalRelation = null
+ plan collect {
+ case l: LogicalRelation => relation = l
+ }
+ if (relation != null) {
+ relation.output.find { p =>
+ p.dataType match {
+ case n: NumericType => true
+ case _ => false
+ }
+ }.getOrElse(relation.output.head)
} else {
- None
+ null
}
-
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
index 79d8ffa..2bf50da 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql
import scala.language.implicitConversions
import org.apache.spark.SparkContext
+import org.apache.spark.sql.catalyst.ParserDialect
import org.apache.spark.sql.catalyst.analysis.{Analyzer, OverrideCatalog}
import org.apache.spark.sql.catalyst.optimizer.{DefaultOptimizer, Optimizer}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
@@ -52,7 +53,7 @@ class CarbonContext(val sc: SparkContext, val storePath: String) extends HiveCon
override protected[sql] lazy val optimizer: Optimizer =
new CarbonOptimizer(DefaultOptimizer, conf)
- override protected[sql] def dialectClassName = classOf[CarbonSQLDialect].getCanonicalName
+ protected[sql] override def getSQLDialect(): ParserDialect = new CarbonSQLDialect(this)
experimental.extraStrategies = CarbonStrategy.getStrategy(self)
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
index f95acf4..94b38a4 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
@@ -24,6 +24,7 @@ import scala.language.implicitConversions
import org.apache.hadoop.fs.Path
import org.apache.spark._
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.plans.logical._
@@ -56,7 +57,11 @@ class CarbonSource
case _ =>
val options = new CarbonOption(parameters)
val tableIdentifier = options.tableIdentifier.split("""\.""").toSeq
- CarbonDatasourceRelation(tableIdentifier, None)(sqlContext)
+ val ident = tableIdentifier match {
+ case Seq(name) => TableIdentifier(name)
+ case Seq(db, name) => TableIdentifier(name, Some(db))
+ }
+ CarbonDatasourceRelation(ident, None)(sqlContext)
}
}
@@ -120,14 +125,14 @@ class CarbonSource
* This relation is stored to hive metastore
*/
private[sql] case class CarbonDatasourceRelation(
- tableIdentifier: Seq[String],
+ tableIdentifier: TableIdentifier,
alias: Option[String])
(@transient context: SQLContext)
extends BaseRelation with Serializable with Logging {
def carbonRelation: CarbonRelation = {
CarbonEnv.getInstance(context)
- .carbonCatalog.lookupRelation2(tableIdentifier, None)(sqlContext)
+ .carbonCatalog.lookupRelation1(tableIdentifier, None)(sqlContext)
.asInstanceOf[CarbonRelation]
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index a4ac246..600519f 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -32,6 +32,7 @@ import org.carbondata.core.cache.dictionary.{Dictionary, DictionaryColumnUniqueI
import org.carbondata.core.carbon.{AbsoluteTableIdentifier, CarbonTableIdentifier}
import org.carbondata.core.carbon.metadata.datatype.DataType
import org.carbondata.core.carbon.metadata.encoder.Encoding
+import org.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension
import org.carbondata.query.carbon.util.DataTypeUtil
/**
@@ -62,7 +63,7 @@ case class CarbonDictionaryDecoder(
!carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY) &&
canBeDecoded(attr)) {
val newAttr = AttributeReference(a.name,
- convertCarbonToSparkDataType(carbonDimension.getDataType),
+ convertCarbonToSparkDataType(carbonDimension),
a.nullable,
a.metadata)(a.exprId,
a.qualifiers).asInstanceOf[Attribute]
@@ -88,8 +89,8 @@ case class CarbonDictionaryDecoder(
}
}
- def convertCarbonToSparkDataType(dataType: DataType): types.DataType = {
- dataType match {
+ def convertCarbonToSparkDataType(carbonDimension: CarbonDimension): types.DataType = {
+ carbonDimension.getDataType match {
case DataType.STRING => StringType
case DataType.INT => IntegerType
case DataType.LONG => LongType
@@ -125,6 +126,9 @@ case class CarbonDictionaryDecoder(
dictIds
}
+
+ override def outputsUnsafeRows: Boolean = true
+
override def doExecute(): RDD[InternalRow] = {
attachTree(this, "execute") {
val storePath = sqlContext.catalog.asInstanceOf[CarbonMetastoreCatalog].storePath
@@ -143,20 +147,21 @@ case class CarbonDictionaryDecoder(
val dicts: Seq[Dictionary] = getDictionary(absoluteTableIdentifiers,
forwardDictionaryCache)
new Iterator[InternalRow] {
+ val unsafeProjection = UnsafeProjection.create(output.map(_.dataType).toArray)
override final def hasNext: Boolean = iter.hasNext
override final def next(): InternalRow = {
val row: InternalRow = iter.next()
val data = row.toSeq(dataTypes).toArray
for (i <- data.indices) {
- if (dicts(i) != null) {
+ if (dicts(i) != null && data(i) != null) {
data(i) = toType(DataTypeUtil
.getDataBasedOnDataType(dicts(i)
.getDictionaryValueForKey(data(i).asInstanceOf[Integer]),
getDictionaryColumnIds(i)._3))
}
}
- new GenericMutableRow(data)
+ unsafeProjection(new GenericMutableRow(data))
}
}
}