You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by jb...@apache.org on 2016/06/23 14:16:04 UTC
[16/56] [abbrv] incubator-carbondata git commit: Refactored core
package and fixed all testcases (#684)
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/carbon/result/impl/MapBasedResult.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/result/impl/MapBasedResult.java b/core/src/main/java/org/carbondata/query/carbon/result/impl/MapBasedResult.java
deleted file mode 100644
index 42c3027..0000000
--- a/core/src/main/java/org/carbondata/query/carbon/result/impl/MapBasedResult.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.carbondata.query.carbon.result.impl;
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.carbondata.core.constants.CarbonCommonConstants;
-import org.carbondata.query.aggregator.MeasureAggregator;
-import org.carbondata.query.carbon.result.Result;
-import org.carbondata.query.carbon.wrappers.ByteArrayWrapper;
-
-/**
- * To store aggregated result
- */
-public class MapBasedResult implements Result<Map<ByteArrayWrapper, MeasureAggregator[]>> {
- /**
- * iterator over result
- */
- private Iterator<Entry<ByteArrayWrapper, MeasureAggregator[]>> resultIterator;
-
- /**
- * result entry
- */
- private Entry<ByteArrayWrapper, MeasureAggregator[]> resultEntry;
-
- /**
- * scanned result
- */
- private Map<ByteArrayWrapper, MeasureAggregator[]> scannerResult;
-
- /**
- * total number of result
- */
- private int resulSize;
-
- public MapBasedResult() {
- scannerResult = new HashMap<ByteArrayWrapper, MeasureAggregator[]>(
- CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
- this.resultIterator = scannerResult.entrySet().iterator();
- }
-
- /**
- * @return the key
- */
- @Override public ByteArrayWrapper getKey() {
- resultEntry = this.resultIterator.next();
- return resultEntry.getKey();
- }
-
- /**
- * return the value
- */
- @Override public MeasureAggregator[] getValue() {
- return resultEntry.getValue();
- }
-
- /**
- * Method to check more result is present
- * or not
- */
- @Override public boolean hasNext() {
- return this.resultIterator.hasNext();
- }
-
- /***
- * below method will be used to merge the
- * scanned result
- *
- * @param otherResult return to be merged
- */
- @Override public void addScannedResult(Map<ByteArrayWrapper, MeasureAggregator[]> scannerResult) {
- this.scannerResult = scannerResult;
- resulSize = scannerResult.size();
- this.resultIterator = scannerResult.entrySet().iterator();
- }
-
- /***
- * below method will be used to merge the
- * scanned result, in case of map based the
- * result we need to aggregate the result
- *
- * @param otherResult return to be merged
- */
- @Override public void merge(Result<Map<ByteArrayWrapper, MeasureAggregator[]>> result) {
- ByteArrayWrapper key = null;
- MeasureAggregator[] value = null;
- Map<ByteArrayWrapper, MeasureAggregator[]> otherResult = result.getResult();
- if (otherResult != null) {
- while (resultIterator.hasNext()) {
- Entry<ByteArrayWrapper, MeasureAggregator[]> entry = resultIterator.next();
- key = entry.getKey();
- value = entry.getValue();
- MeasureAggregator[] agg = otherResult.get(key);
- if (agg != null) {
- for (int j = 0; j < agg.length; j++) {
- agg[j].merge(value[j]);
- }
- } else {
- otherResult.put(key, value);
- }
- }
- resulSize = otherResult.size();
- this.resultIterator = otherResult.entrySet().iterator();
- this.scannerResult = otherResult;
- }
- }
-
- /**
- * Return the size of the result
- */
- @Override public int size() {
- return resulSize;
- }
-
- /**
- * @return the complete result
- */
- @Override public Map<ByteArrayWrapper, MeasureAggregator[]> getResult() {
- return this.scannerResult;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/carbon/result/iterator/AbstractDetailQueryResultIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/result/iterator/AbstractDetailQueryResultIterator.java b/core/src/main/java/org/carbondata/query/carbon/result/iterator/AbstractDetailQueryResultIterator.java
index bd56ec2..09fa50c 100644
--- a/core/src/main/java/org/carbondata/query/carbon/result/iterator/AbstractDetailQueryResultIterator.java
+++ b/core/src/main/java/org/carbondata/query/carbon/result/iterator/AbstractDetailQueryResultIterator.java
@@ -39,7 +39,7 @@ import org.carbondata.query.carbon.model.QueryModel;
* executing that query are returning a iterator over block and every time next
* call will come it will execute the block and return the result
*/
-public abstract class AbstractDetailQueryResultIterator<E> extends CarbonIterator<E> {
+public abstract class AbstractDetailQueryResultIterator extends CarbonIterator {
/**
* LOGGER.
@@ -75,7 +75,7 @@ public abstract class AbstractDetailQueryResultIterator<E> extends CarbonIterato
/**
* current counter to check how blocklet has been executed
*/
- private long currentCounter;
+ protected long currentCounter;
/**
* keep the track of number of blocklet of a block has been executed
@@ -138,7 +138,7 @@ public abstract class AbstractDetailQueryResultIterator<E> extends CarbonIterato
return currentCounter < totalNumberOfNode;
}
- protected void updateSliceIndexToBeExecuted() {
+ protected int updateSliceIndexToBeExecuted() {
Arrays.fill(blockIndexToBeExecuted, -1);
int currentSliceIndex = 0;
int i = 0;
@@ -154,7 +154,7 @@ public abstract class AbstractDetailQueryResultIterator<E> extends CarbonIterato
break;
}
}
- currentCounter += i;
+ return i;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/carbon/result/iterator/ChunkBasedDetailResultIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/result/iterator/ChunkBasedDetailResultIterator.java b/core/src/main/java/org/carbondata/query/carbon/result/iterator/ChunkBasedDetailResultIterator.java
new file mode 100644
index 0000000..826f816
--- /dev/null
+++ b/core/src/main/java/org/carbondata/query/carbon/result/iterator/ChunkBasedDetailResultIterator.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.carbondata.query.carbon.result.iterator;
+
+import java.util.List;
+
+import org.carbondata.core.iterator.CarbonIterator;
+import org.carbondata.query.carbon.executor.impl.QueryExecutorProperties;
+import org.carbondata.query.carbon.model.QueryModel;
+import org.carbondata.query.carbon.result.BatchResult;
+import org.carbondata.query.carbon.result.ListBasedResultWrapper;
+import org.carbondata.query.carbon.result.Result;
+import org.carbondata.query.carbon.result.preparator.QueryResultPreparator;
+import org.carbondata.query.carbon.result.preparator.impl.DetailQueryResultPreparatorImpl;
+
+/**
+ * Iterator over chunk result
+ */
+public class ChunkBasedDetailResultIterator extends CarbonIterator<BatchResult> {
+
+ /**
+ * query result prepartor which will be used to create a query result
+ */
+ private QueryResultPreparator<List<ListBasedResultWrapper>, Object> queryResultPreparator;
+
+ /**
+ * iterator over result
+ */
+ private CarbonIterator<Result> queryResultIterator;
+
+ public ChunkBasedDetailResultIterator(CarbonIterator<Result> queryResultIterator,
+ QueryExecutorProperties executerProperties, QueryModel queryModel) {
+ this.queryResultIterator = queryResultIterator;
+ this.queryResultPreparator =
+ new DetailQueryResultPreparatorImpl(executerProperties, queryModel);
+
+ }
+
+ /**
+ * Returns {@code true} if the iteration has more elements. (In other words,
+ * returns {@code true}
+ *
+ * @return {@code true} if the iteration has more elements
+ */
+ @Override public boolean hasNext() {
+ return queryResultIterator.hasNext();
+ }
+
+ /**
+ * Returns the next element in the iteration.
+ *
+ * @return the next element in the iteration
+ */
+ @Override public BatchResult next() {
+ return queryResultPreparator.prepareQueryResult(queryResultIterator.next());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/carbon/result/iterator/ChunkBasedResultIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/result/iterator/ChunkBasedResultIterator.java b/core/src/main/java/org/carbondata/query/carbon/result/iterator/ChunkBasedResultIterator.java
deleted file mode 100644
index 71b311f..0000000
--- a/core/src/main/java/org/carbondata/query/carbon/result/iterator/ChunkBasedResultIterator.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.carbondata.query.carbon.result.iterator;
-
-import org.carbondata.core.iterator.CarbonIterator;
-import org.carbondata.query.carbon.executor.impl.QueryExecutorProperties;
-import org.carbondata.query.carbon.model.QueryModel;
-import org.carbondata.query.carbon.result.BatchResult;
-import org.carbondata.query.carbon.result.Result;
-import org.carbondata.query.carbon.result.preparator.QueryResultPreparator;
-import org.carbondata.query.carbon.result.preparator.impl.QueryResultPreparatorImpl;
-
-/**
- * Iterator over chunk result
- */
-public class ChunkBasedResultIterator extends CarbonIterator<BatchResult> {
-
- /**
- * query result prepartor which will be used to create a query result
- */
- private QueryResultPreparator<BatchResult> queryResultPreparator;
-
- /**
- * iterator over result
- */
- private CarbonIterator<Result> queryResultIterator;
-
- public ChunkBasedResultIterator(CarbonIterator<Result> queryResultIterator,
- QueryExecutorProperties executerProperties, QueryModel queryModel) {
- this.queryResultIterator = queryResultIterator;
- this.queryResultPreparator = new QueryResultPreparatorImpl(executerProperties, queryModel);
-
- }
-
- /**
- * Returns {@code true} if the iteration has more elements. (In other words,
- * returns {@code true}
- *
- * @return {@code true} if the iteration has more elements
- */
- @Override public boolean hasNext() {
- return queryResultIterator.hasNext();
- }
-
- /**
- * Returns the next element in the iteration.
- *
- * @return the next element in the iteration
- */
- @Override public BatchResult next() {
- return queryResultPreparator.prepareQueryResult(queryResultIterator.next());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/carbon/result/iterator/ChunkRawRowIterartor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/result/iterator/ChunkRawRowIterartor.java b/core/src/main/java/org/carbondata/query/carbon/result/iterator/ChunkRawRowIterartor.java
deleted file mode 100644
index ea4d65c..0000000
--- a/core/src/main/java/org/carbondata/query/carbon/result/iterator/ChunkRawRowIterartor.java
+++ /dev/null
@@ -1,56 +0,0 @@
-package org.carbondata.query.carbon.result.iterator;
-
-import org.carbondata.core.iterator.CarbonIterator;
-import org.carbondata.query.carbon.result.BatchRawResult;
-
-public class ChunkRawRowIterartor extends CarbonIterator<Object[]> {
-
- /**
- * iterator over chunk result
- */
- private CarbonIterator<BatchRawResult> iterator;
-
- /**
- * currect chunk
- */
- private BatchRawResult currentchunk;
-
- public ChunkRawRowIterartor(CarbonIterator<BatchRawResult> iterator) {
- this.iterator = iterator;
- if (iterator.hasNext()) {
- currentchunk = iterator.next();
- }
- }
-
- /**
- * Returns {@code true} if the iteration has more elements. (In other words,
- * returns {@code true} if {@link #next} would return an element rather than
- * throwing an exception.)
- *
- * @return {@code true} if the iteration has more elements
- */
- @Override public boolean hasNext() {
- if (null != currentchunk) {
- if ((currentchunk.hasNext())) {
- return true;
- } else if (!currentchunk.hasNext()) {
- while (iterator.hasNext()) {
- currentchunk = iterator.next();
- if (currentchunk != null && currentchunk.hasNext()) {
- return true;
- }
- }
- }
- }
- return false;
- }
-
- /**
- * Returns the next element in the iteration.
- *
- * @return the next element in the iteration
- */
- @Override public Object[] next() {
- return currentchunk.next();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/carbon/result/iterator/ChunkRowIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/result/iterator/ChunkRowIterator.java b/core/src/main/java/org/carbondata/query/carbon/result/iterator/ChunkRowIterator.java
index 6ba54cd..3db3404 100644
--- a/core/src/main/java/org/carbondata/query/carbon/result/iterator/ChunkRowIterator.java
+++ b/core/src/main/java/org/carbondata/query/carbon/result/iterator/ChunkRowIterator.java
@@ -21,12 +21,11 @@ package org.carbondata.query.carbon.result.iterator;
import org.carbondata.core.iterator.CarbonIterator;
import org.carbondata.query.carbon.result.BatchResult;
-import org.carbondata.query.carbon.result.RowResult;
/**
* Iterator over row result
*/
-public class ChunkRowIterator extends CarbonIterator<RowResult> {
+public class ChunkRowIterator extends CarbonIterator<Object[]> {
/**
* iterator over chunk result
@@ -73,7 +72,7 @@ public class ChunkRowIterator extends CarbonIterator<RowResult> {
*
* @return the next element in the iteration
*/
- @Override public RowResult next() {
+ @Override public Object[] next() {
return currentchunk.next();
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/carbon/result/iterator/DetailQueryResultIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/result/iterator/DetailQueryResultIterator.java b/core/src/main/java/org/carbondata/query/carbon/result/iterator/DetailQueryResultIterator.java
index f07eb20..3641e75 100644
--- a/core/src/main/java/org/carbondata/query/carbon/result/iterator/DetailQueryResultIterator.java
+++ b/core/src/main/java/org/carbondata/query/carbon/result/iterator/DetailQueryResultIterator.java
@@ -29,16 +29,17 @@ import org.carbondata.query.carbon.executor.infos.BlockExecutionInfo;
import org.carbondata.query.carbon.executor.internal.InternalQueryExecutor;
import org.carbondata.query.carbon.model.QueryModel;
import org.carbondata.query.carbon.result.BatchResult;
+import org.carbondata.query.carbon.result.ListBasedResultWrapper;
import org.carbondata.query.carbon.result.Result;
import org.carbondata.query.carbon.result.preparator.QueryResultPreparator;
-import org.carbondata.query.carbon.result.preparator.impl.QueryResultPreparatorImpl;
+import org.carbondata.query.carbon.result.preparator.impl.DetailQueryResultPreparatorImpl;
/**
* In case of detail query we cannot keep all the records in memory so for
* executing that query are returning a iterator over block and every time next
* call will come it will execute the block and return the result
*/
-public class DetailQueryResultIterator extends AbstractDetailQueryResultIterator<BatchResult> {
+public class DetailQueryResultIterator extends AbstractDetailQueryResultIterator {
/**
* LOGGER.
@@ -49,17 +50,18 @@ public class DetailQueryResultIterator extends AbstractDetailQueryResultIterator
/**
* to prepare the result
*/
- private QueryResultPreparator<BatchResult> queryResultPreparator;
+ private QueryResultPreparator<List<ListBasedResultWrapper>, Object> queryResultPreparator;
public DetailQueryResultIterator(List<BlockExecutionInfo> infos,
QueryExecutorProperties executerProperties, QueryModel queryModel,
InternalQueryExecutor queryExecutor) {
super(infos, executerProperties, queryModel, queryExecutor);
- this.queryResultPreparator = new QueryResultPreparatorImpl(executerProperties, queryModel);
+ this.queryResultPreparator =
+ new DetailQueryResultPreparatorImpl(executerProperties, queryModel);
}
@Override public BatchResult next() {
- updateSliceIndexToBeExecuted();
+ currentCounter += updateSliceIndexToBeExecuted();
CarbonIterator<Result> result = null;
try {
result = executor.executeQuery(blockExecutionInfos, blockIndexToBeExecuted);
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/carbon/result/iterator/DetailRawQueryResultIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/result/iterator/DetailRawQueryResultIterator.java b/core/src/main/java/org/carbondata/query/carbon/result/iterator/DetailRawQueryResultIterator.java
index 4f9dbe2..2b14793 100644
--- a/core/src/main/java/org/carbondata/query/carbon/result/iterator/DetailRawQueryResultIterator.java
+++ b/core/src/main/java/org/carbondata/query/carbon/result/iterator/DetailRawQueryResultIterator.java
@@ -19,6 +19,10 @@
package org.carbondata.query.carbon.result.iterator;
import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import org.carbondata.core.iterator.CarbonIterator;
import org.carbondata.query.carbon.executor.exception.QueryExecutionException;
@@ -26,7 +30,8 @@ import org.carbondata.query.carbon.executor.impl.QueryExecutorProperties;
import org.carbondata.query.carbon.executor.infos.BlockExecutionInfo;
import org.carbondata.query.carbon.executor.internal.InternalQueryExecutor;
import org.carbondata.query.carbon.model.QueryModel;
-import org.carbondata.query.carbon.result.BatchRawResult;
+import org.carbondata.query.carbon.result.BatchResult;
+import org.carbondata.query.carbon.result.ListBasedResultWrapper;
import org.carbondata.query.carbon.result.Result;
import org.carbondata.query.carbon.result.preparator.QueryResultPreparator;
import org.carbondata.query.carbon.result.preparator.impl.RawQueryResultPreparatorImpl;
@@ -36,10 +41,13 @@ import org.carbondata.query.carbon.result.preparator.impl.RawQueryResultPreparat
* executing that query are returning a iterator over block and every time next
* call will come it will execute the block and return the result
*/
-public class DetailRawQueryResultIterator
- extends AbstractDetailQueryResultIterator<BatchRawResult> {
+public class DetailRawQueryResultIterator extends AbstractDetailQueryResultIterator {
- private QueryResultPreparator<BatchRawResult> queryResultPreparator;
+ private ExecutorService execService = Executors.newFixedThreadPool(1);
+
+ private Future<ResultInfo> future;
+
+ private QueryResultPreparator<List<ListBasedResultWrapper>, Object> queryResultPreparator;
public DetailRawQueryResultIterator(List<BlockExecutionInfo> infos,
QueryExecutorProperties executerProperties, QueryModel queryModel,
@@ -48,26 +56,63 @@ public class DetailRawQueryResultIterator
this.queryResultPreparator = new RawQueryResultPreparatorImpl(executerProperties, queryModel);
}
- @Override public BatchRawResult next() {
- updateSliceIndexToBeExecuted();
- CarbonIterator<Result> result = null;
- try {
- result = executor.executeQuery(blockExecutionInfos, blockIndexToBeExecuted);
- } catch (QueryExecutionException ex) {
- throw new RuntimeException(ex.getCause());
+ @Override public BatchResult next() {
+ BatchResult result;
+ if (future == null) {
+ future = execute();
}
- for (int i = 0; i < blockIndexToBeExecuted.length; i++) {
- if (blockIndexToBeExecuted[i] != -1) {
- blockExecutionInfos.get(blockIndexToBeExecuted[i]).setFirstDataBlock(
- blockExecutionInfos.get(blockIndexToBeExecuted[i]).getFirstDataBlock()
- .getNextDataRefNode());
- }
+ ResultInfo resultFromFuture = getResultFromFuture(future);
+ result = resultFromFuture.result;
+ currentCounter += resultFromFuture.counter;
+ if (hasNext()) {
+ future = execute();
}
- if (null != result) {
- Result next = result.next();
- return queryResultPreparator.prepareQueryResult(next);
- } else {
- return queryResultPreparator.prepareQueryResult(null);
+ return result;
+ }
+
+ private ResultInfo getResultFromFuture(Future<ResultInfo> future) {
+ try {
+ return future.get();
+ } catch (Exception e) {
+ e.printStackTrace();
}
+ return new ResultInfo();
+ }
+
+ private Future<ResultInfo> execute() {
+ return execService.submit(new Callable<ResultInfo>() {
+ @Override public ResultInfo call() {
+ CarbonIterator<Result> result = null;
+ int counter = updateSliceIndexToBeExecuted();
+ try {
+ result = executor.executeQuery(blockExecutionInfos, blockIndexToBeExecuted);
+ } catch (QueryExecutionException ex) {
+ throw new RuntimeException(ex.getCause());
+ }
+ for (int i = 0; i < blockIndexToBeExecuted.length; i++) {
+ if (blockIndexToBeExecuted[i] != -1) {
+ blockExecutionInfos.get(blockIndexToBeExecuted[i]).setFirstDataBlock(
+ blockExecutionInfos.get(blockIndexToBeExecuted[i]).getFirstDataBlock()
+ .getNextDataRefNode());
+ }
+ }
+ BatchResult batchResult;
+ if (null != result) {
+ Result next = result.next();
+ batchResult = queryResultPreparator.prepareQueryResult(next);
+ } else {
+ batchResult = queryResultPreparator.prepareQueryResult(null);
+ }
+ ResultInfo resultInfo = new ResultInfo();
+ resultInfo.counter = counter;
+ resultInfo.result = batchResult;
+ return resultInfo;
+ }
+ });
+ }
+
+ private static class ResultInfo {
+ private int counter;
+ private BatchResult result;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/carbon/result/preparator/QueryResultPreparator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/result/preparator/QueryResultPreparator.java b/core/src/main/java/org/carbondata/query/carbon/result/preparator/QueryResultPreparator.java
index 431e163..fbf3074 100644
--- a/core/src/main/java/org/carbondata/query/carbon/result/preparator/QueryResultPreparator.java
+++ b/core/src/main/java/org/carbondata/query/carbon/result/preparator/QueryResultPreparator.java
@@ -1,9 +1,10 @@
package org.carbondata.query.carbon.result.preparator;
+import org.carbondata.query.carbon.result.BatchResult;
import org.carbondata.query.carbon.result.Result;
-public interface QueryResultPreparator<E> {
+public interface QueryResultPreparator<K, V> {
- public E prepareQueryResult(Result scannedResult);
+ public BatchResult prepareQueryResult(Result<K, V> scannedResult);
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/carbon/result/preparator/impl/AbstractQueryResultPreparator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/result/preparator/impl/AbstractQueryResultPreparator.java b/core/src/main/java/org/carbondata/query/carbon/result/preparator/impl/AbstractQueryResultPreparator.java
index 1890baf..ad5b4c5 100644
--- a/core/src/main/java/org/carbondata/query/carbon/result/preparator/impl/AbstractQueryResultPreparator.java
+++ b/core/src/main/java/org/carbondata/query/carbon/result/preparator/impl/AbstractQueryResultPreparator.java
@@ -1,17 +1,19 @@
package org.carbondata.query.carbon.result.preparator.impl;
-import java.util.ArrayList;
import java.util.List;
-import org.carbondata.query.aggregator.MeasureAggregator;
+import org.carbondata.core.carbon.metadata.encoder.Encoding;
+import org.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
+import org.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
+import org.carbondata.core.util.CarbonUtil;
import org.carbondata.query.carbon.executor.impl.QueryExecutorProperties;
+import org.carbondata.query.carbon.model.QueryDimension;
import org.carbondata.query.carbon.model.QueryModel;
import org.carbondata.query.carbon.result.BatchResult;
import org.carbondata.query.carbon.result.preparator.QueryResultPreparator;
-import org.carbondata.query.scanner.impl.CarbonKey;
-import org.carbondata.query.scanner.impl.CarbonValue;
+import org.carbondata.query.carbon.util.DataTypeUtil;
-public abstract class AbstractQueryResultPreparator<E> implements QueryResultPreparator<E> {
+public abstract class AbstractQueryResultPreparator<K, V> implements QueryResultPreparator<K, V> {
/**
* query properties
@@ -29,13 +31,35 @@ public abstract class AbstractQueryResultPreparator<E> implements QueryResultPre
this.queryModel = queryModel;
}
- protected void fillMeasureValueForAggGroupByQuery(QueryModel queryModel,
- Object[][] surrogateResult, int dimensionCount, int columnIndex, MeasureAggregator[] v) {
- int msrCount = queryModel.getQueryMeasures().size();
- for (int i = 0; i < msrCount; i++) {
- v[queryExecuterProperties.measureStartIndex + i] =
- ((MeasureAggregator) surrogateResult[dimensionCount
- + queryExecuterProperties.measureStartIndex + i][columnIndex]);
+ protected void fillDimensionData(Object[][] convertedResult, List<QueryDimension> queryDimensions,
+ int dimensionCount, Object[] row, int rowIndex) {
+ QueryDimension queryDimension;
+ for (int i = 0; i < dimensionCount; i++) {
+ queryDimension = queryDimensions.get(i);
+ if (!CarbonUtil
+ .hasEncoding(queryDimension.getDimension().getEncoder(), Encoding.DICTIONARY)) {
+ row[queryDimension.getQueryOrder()] = convertedResult[i][rowIndex];
+ } else if (CarbonUtil
+ .hasEncoding(queryDimension.getDimension().getEncoder(), Encoding.DIRECT_DICTIONARY)) {
+ DirectDictionaryGenerator directDictionaryGenerator = DirectDictionaryKeyGeneratorFactory
+ .getDirectDictionaryGenerator(queryDimension.getDimension().getDataType());
+ row[queryDimension.getQueryOrder()] = directDictionaryGenerator
+ .getValueFromSurrogate((Integer) convertedResult[i][rowIndex]);
+ } else {
+ if (queryExecuterProperties.sortDimIndexes[i] == 1) {
+ row[queryDimension.getQueryOrder()] = DataTypeUtil.getDataBasedOnDataType(
+ queryExecuterProperties.columnToDictionayMapping
+ .get(queryDimension.getDimension().getColumnId())
+ .getDictionaryValueFromSortedIndex((Integer) convertedResult[i][rowIndex]),
+ queryDimension.getDimension().getDataType());
+ } else {
+ row[queryDimension.getQueryOrder()] = DataTypeUtil.getDataBasedOnDataType(
+ queryExecuterProperties.columnToDictionayMapping
+ .get(queryDimension.getDimension().getColumnId())
+ .getDictionaryValueForKey((Integer) convertedResult[i][rowIndex]),
+ queryDimension.getDimension().getDataType());
+ }
+ }
}
}
@@ -54,18 +78,9 @@ public abstract class AbstractQueryResultPreparator<E> implements QueryResultPre
}
protected BatchResult getEmptyChunkResult(int size) {
- List<CarbonKey> keys = new ArrayList<CarbonKey>(size);
- List<CarbonValue> values = new ArrayList<CarbonValue>(size);
- Object[] row = new Object[1];
- for (int i = 0; i < size; i++)
-
- {
- values.add(new CarbonValue(new MeasureAggregator[0]));
- keys.add(new CarbonKey(row));
- }
+ Object[][] row = new Object[size][1];
BatchResult chunkResult = new BatchResult();
- chunkResult.setKeys(keys);
- chunkResult.setValues(values);
+ chunkResult.setRows(row);
return chunkResult;
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/carbon/result/preparator/impl/DetailQueryResultPreparatorImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/result/preparator/impl/DetailQueryResultPreparatorImpl.java b/core/src/main/java/org/carbondata/query/carbon/result/preparator/impl/DetailQueryResultPreparatorImpl.java
new file mode 100644
index 0000000..712894a
--- /dev/null
+++ b/core/src/main/java/org/carbondata/query/carbon/result/preparator/impl/DetailQueryResultPreparatorImpl.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.carbondata.query.carbon.result.preparator.impl;
+
+import java.nio.charset.Charset;
+import java.util.List;
+
+import org.carbondata.common.logging.LogService;
+import org.carbondata.common.logging.LogServiceFactory;
+import org.carbondata.core.carbon.metadata.encoder.Encoding;
+import org.carbondata.core.constants.CarbonCommonConstants;
+import org.carbondata.core.util.CarbonUtil;
+import org.carbondata.query.carbon.executor.impl.QueryExecutorProperties;
+import org.carbondata.query.carbon.model.QueryDimension;
+import org.carbondata.query.carbon.model.QueryMeasure;
+import org.carbondata.query.carbon.model.QueryModel;
+import org.carbondata.query.carbon.result.BatchResult;
+import org.carbondata.query.carbon.result.ListBasedResultWrapper;
+import org.carbondata.query.carbon.result.Result;
+import org.carbondata.query.carbon.util.DataTypeUtil;
+import org.carbondata.query.carbon.wrappers.ByteArrayWrapper;
+
+/**
+ * Below class will be used to get the result by converting to actual data
+ * Actual data conversion can be converting the surrogate key to actual data
+ *
+ * @TODO there are many things in class which is very confusing, need to check
+ * why it was handled like that and how we can handle that in a better
+ * way.Need to revisit this class. IF aggregation is push down to spark
+ * layer and if we can process the data in byte array format then this
+ * class wont be useful so in future we can delete this class.
+ * @TODO need to expose one interface which will return the result based on required type
+ * for example its implementation case return converted result or directly result with out
+ * converting to actual value
+ */
+public class DetailQueryResultPreparatorImpl
+ extends AbstractQueryResultPreparator<List<ListBasedResultWrapper>, Object> {
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(DetailQueryResultPreparatorImpl.class.getName());
+
+ public DetailQueryResultPreparatorImpl(QueryExecutorProperties executerProperties,
+ QueryModel queryModel) {
+ super(executerProperties, queryModel);
+ }
+
+ @Override public BatchResult prepareQueryResult(
+ Result<List<ListBasedResultWrapper>, Object> scannedResult) {
+ if ((null == scannedResult || scannedResult.size() < 1)) {
+ return new BatchResult();
+ }
+ List<QueryDimension> queryDimension = queryModel.getQueryDimension();
+ int dimensionCount = queryDimension.size();
+ int totalNumberOfColumn = dimensionCount + queryExecuterProperties.measureDataTypes.length;
+ Object[][] resultData = new Object[scannedResult.size()][totalNumberOfColumn];
+ if (!queryExecuterProperties.isFunctionQuery && totalNumberOfColumn == 0
+ && scannedResult.size() > 0) {
+ return getEmptyChunkResult(scannedResult.size());
+ }
+ int currentRow = 0;
+ long[] surrogateResult = null;
+ int noDictionaryColumnIndex = 0;
+ ByteArrayWrapper key = null;
+ Object[] value = null;
+ while (scannedResult.hasNext()) {
+ key = scannedResult.getKey();
+ value = scannedResult.getValue();
+ if (key != null) {
+ surrogateResult = queryExecuterProperties.keyStructureInfo.getKeyGenerator()
+ .getKeyArray(key.getDictionaryKey(),
+ queryExecuterProperties.keyStructureInfo.getMaskedBytes());
+ for (int i = 0; i < dimensionCount; i++) {
+ if (!CarbonUtil.hasEncoding(queryDimension.get(i).getDimension().getEncoder(),
+ Encoding.DICTIONARY)) {
+ resultData[currentRow][i] = DataTypeUtil.getDataBasedOnDataType(
+ new String(key.getNoDictionaryKeyByIndex(noDictionaryColumnIndex++),
+ Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)),
+ queryDimension.get(i).getDimension().getDataType());
+ } else {
+ resultData[currentRow][i] =
+ (int) surrogateResult[queryDimension.get(i).getDimension().getKeyOrdinal()];
+ }
+ }
+ }
+ if (value != null) {
+ System.arraycopy(value, 0, resultData[currentRow], dimensionCount,
+ queryExecuterProperties.measureDataTypes.length);
+ }
+ currentRow++;
+ noDictionaryColumnIndex = 0;
+ }
+ if (resultData.length > 0) {
+ resultData = encodeToRows(resultData);
+ }
+ return getResult(queryModel, resultData);
+ }
+
+ private BatchResult getResult(QueryModel queryModel, Object[][] convertedResult) {
+
+ int rowSize = convertedResult[0].length;
+ Object[][] rows = new Object[rowSize][];
+ List<QueryDimension> queryDimensions = queryModel.getQueryDimension();
+ int dimensionCount = queryDimensions.size();
+ int msrCount = queryExecuterProperties.measureDataTypes.length;
+ Object[] row;
+ for (int rowIndex = 0; rowIndex < rowSize; rowIndex++) {
+ row = new Object[dimensionCount + msrCount];
+ fillDimensionData(convertedResult, queryDimensions, dimensionCount, row, rowIndex);
+
+ QueryMeasure msr;
+ for (int i = 0; i < queryModel.getQueryMeasures().size(); i++) {
+ msr = queryModel.getQueryMeasures().get(i);
+ row[msr.getQueryOrder()] = convertedResult[dimensionCount + i][rowIndex];
+ }
+ rows[rowIndex] = row;
+ }
+ LOGGER.info(
+ "###########################################------ Total Number of records" + rowSize);
+ BatchResult chunkResult = new BatchResult();
+ chunkResult.setRows(rows);
+ return chunkResult;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/carbon/result/preparator/impl/QueryResultPreparatorImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/result/preparator/impl/QueryResultPreparatorImpl.java b/core/src/main/java/org/carbondata/query/carbon/result/preparator/impl/QueryResultPreparatorImpl.java
deleted file mode 100644
index 5604ecd..0000000
--- a/core/src/main/java/org/carbondata/query/carbon/result/preparator/impl/QueryResultPreparatorImpl.java
+++ /dev/null
@@ -1,297 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.carbondata.query.carbon.result.preparator.impl;
-
-import java.math.BigDecimal;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import org.carbondata.common.logging.LogService;
-import org.carbondata.common.logging.LogServiceFactory;
-import org.carbondata.core.carbon.metadata.encoder.Encoding;
-import org.carbondata.core.constants.CarbonCommonConstants;
-import org.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
-import org.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
-import org.carbondata.core.util.CarbonUtil;
-import org.carbondata.query.aggregator.MeasureAggregator;
-import org.carbondata.query.aggregator.impl.count.CountAggregator;
-import org.carbondata.query.aggregator.impl.distinct.DistinctCountAggregator;
-import org.carbondata.query.aggregator.impl.distinct.DistinctStringCountAggregator;
-import org.carbondata.query.carbon.executor.impl.QueryExecutorProperties;
-import org.carbondata.query.carbon.model.DimensionAggregatorInfo;
-import org.carbondata.query.carbon.model.QueryDimension;
-import org.carbondata.query.carbon.model.QueryMeasure;
-import org.carbondata.query.carbon.model.QueryModel;
-import org.carbondata.query.carbon.result.BatchResult;
-import org.carbondata.query.carbon.result.Result;
-import org.carbondata.query.carbon.util.DataTypeUtil;
-import org.carbondata.query.carbon.wrappers.ByteArrayWrapper;
-import org.carbondata.query.scanner.impl.CarbonKey;
-import org.carbondata.query.scanner.impl.CarbonValue;
-
-/**
- * Below class will be used to get the result by converting to actual data
- * Actual data conversion can be converting the surrogate key to actual data
- *
- * @TODO there are many things in class which is very confusing, need to check
- * why it was handled like that and how we can handle that in a better
- * way.Need to revisit this class. IF aggregation is push down to spark
- * layer and if we can process the data in byte array format then this
- * class wont be useful so in future we can delete this class.
- * @TODO need to expose one interface which will return the result based on required type
- * for example its implementation case return converted result or directly result with out
- * converting to actual value
- */
-public class QueryResultPreparatorImpl extends AbstractQueryResultPreparator<BatchResult> {
-
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(QueryResultPreparatorImpl.class.getName());
-
- public QueryResultPreparatorImpl(QueryExecutorProperties executerProperties,
- QueryModel queryModel) {
- super(executerProperties, queryModel);
- }
-
- @Override public BatchResult prepareQueryResult(Result scannedResult) {
- if ((null == scannedResult || scannedResult.size() < 1)) {
- return new BatchResult();
- }
- List<QueryDimension> queryDimension = queryModel.getQueryDimension();
- int dimensionCount = queryDimension.size();
- int totalNumberOfColumn = dimensionCount + queryExecuterProperties.measureAggregators.length;
- Object[][] resultData = new Object[scannedResult.size()][totalNumberOfColumn];
- if (!queryExecuterProperties.isFunctionQuery && totalNumberOfColumn == 0
- && scannedResult.size() > 0) {
- return getEmptyChunkResult(scannedResult.size());
- }
- int currentRow = 0;
- long[] surrogateResult = null;
- int noDictionaryColumnIndex = 0;
- ByteArrayWrapper key = null;
- MeasureAggregator[] value = null;
- while (scannedResult.hasNext()) {
- key = scannedResult.getKey();
- value = scannedResult.getValue();
- surrogateResult = queryExecuterProperties.keyStructureInfo.getKeyGenerator()
- .getKeyArray(key.getDictionaryKey(),
- queryExecuterProperties.keyStructureInfo.getMaskedBytes());
- for (int i = 0; i < dimensionCount; i++) {
- if (!CarbonUtil
- .hasEncoding(queryDimension.get(i).getDimension().getEncoder(), Encoding.DICTIONARY)) {
- resultData[currentRow][i] = DataTypeUtil.getDataBasedOnDataType(
- new String(key.getNoDictionaryKeyByIndex(noDictionaryColumnIndex++),
- Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)),
- queryDimension.get(i).getDimension().getDataType());
- } else {
- resultData[currentRow][i] =
- (int) surrogateResult[queryDimension.get(i).getDimension().getKeyOrdinal()];
- }
- }
-
- // @TODO need to check why it was handled like this
- if (queryExecuterProperties.isFunctionQuery) {
- if (value[0].toString().contains("Long")) {
- Long sizeOfListL = value[0].getLongValue();
- return getEmptyChunkResult(sizeOfListL.intValue());
- } else if (value[0].toString().contains("Decimal")) {
- BigDecimal sizeOfListD = value[0].getBigDecimalValue();
- return getEmptyChunkResult(sizeOfListD.intValue());
- } else {
- Double sizeOfList = value[0].getDoubleValue();
- return getEmptyChunkResult(sizeOfList.intValue());
- }
-
- }
- for (int i = 0; i < queryExecuterProperties.measureAggregators.length; i++) {
- resultData[currentRow][dimensionCount + i] = value[i];
- }
- currentRow++;
- noDictionaryColumnIndex = 0;
- }
- if (resultData.length > 0) {
- resultData = encodeToRows(resultData);
- }
- return getResult(queryModel, resultData);
- }
-
- private BatchResult getResult(QueryModel queryModel, Object[][] convertedResult) {
-
- List<CarbonKey> keys = new ArrayList<CarbonKey>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
- List<CarbonValue> values =
- new ArrayList<CarbonValue>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
- List<QueryDimension> queryDimensions = queryModel.getQueryDimension();
- int dimensionCount = queryDimensions.size();
- int msrCount = queryExecuterProperties.measureAggregators.length;
- Object[][] resultDataA = null;
- // @TODO no sure why this check is here as in the caller of this method
- // is returning in case of
- // function query. Need to confirm with other developer who handled this
- // scneario
- if (queryExecuterProperties.isFunctionQuery) {
- msrCount = 1;
- resultDataA = new Object[dimensionCount + msrCount][msrCount];
- } else {
- resultDataA = new Object[dimensionCount + msrCount][convertedResult[0].length];
- }
- Object[] row = null;
- QueryDimension queryDimension = null;
- for (int columnIndex = 0; columnIndex < resultDataA[0].length; columnIndex++) {
- row = new Object[dimensionCount + msrCount];
- for (int i = 0; i < dimensionCount; i++) {
- queryDimension = queryDimensions.get(i);
- if (!CarbonUtil
- .hasEncoding(queryDimension.getDimension().getEncoder(), Encoding.DICTIONARY)) {
- row[queryDimension.getQueryOrder()] = convertedResult[i][columnIndex];
- } else if (CarbonUtil
- .hasEncoding(queryDimension.getDimension().getEncoder(), Encoding.DIRECT_DICTIONARY)) {
- DirectDictionaryGenerator directDictionaryGenerator = DirectDictionaryKeyGeneratorFactory
- .getDirectDictionaryGenerator(queryDimension.getDimension().getDataType());
- row[queryDimension.getQueryOrder()] = directDictionaryGenerator
- .getValueFromSurrogate((Integer) convertedResult[i][columnIndex]);
- } else {
- if (queryExecuterProperties.sortDimIndexes[i] == 1) {
- row[queryDimension.getQueryOrder()] = DataTypeUtil.getDataBasedOnDataType(
- queryExecuterProperties.columnToDictionayMapping
- .get(queryDimension.getDimension().getColumnId())
- .getDictionaryValueFromSortedIndex((Integer) convertedResult[i][columnIndex]),
- queryDimension.getDimension().getDataType());
- } else {
- row[queryDimension.getQueryOrder()] = DataTypeUtil.getDataBasedOnDataType(
- queryExecuterProperties.columnToDictionayMapping
- .get(queryDimension.getDimension().getColumnId())
- .getDictionaryValueForKey((Integer) convertedResult[i][columnIndex]),
- queryDimension.getDimension().getDataType());
- }
- }
- }
- MeasureAggregator[] msrAgg =
- new MeasureAggregator[queryExecuterProperties.measureAggregators.length];
-
- fillMeasureValueForAggGroupByQuery(queryModel, convertedResult, dimensionCount, columnIndex,
- msrAgg);
- fillDimensionAggValue(queryModel, convertedResult, dimensionCount, columnIndex, msrAgg);
-
- if (!queryModel.isDetailQuery()) {
- for (int i = 0; i < queryModel.getQueryMeasures().size(); i++) {
- row[queryModel.getQueryMeasures().get(i).getQueryOrder()] =
- msrAgg[queryExecuterProperties.measureStartIndex + i].get();
- }
- int index = 0;
- for (int i = 0; i < queryModel.getDimAggregationInfo().size(); i++) {
- DimensionAggregatorInfo dimensionAggregatorInfo =
- queryModel.getDimAggregationInfo().get(i);
- for (int j = 0; j < dimensionAggregatorInfo.getOrderList().size(); j++) {
- row[dimensionAggregatorInfo.getOrderList().get(j)] = msrAgg[index++].get();
- }
- }
- for (int i = 0; i < queryModel.getExpressions().size(); i++) {
- row[queryModel.getExpressions().get(i).getQueryOrder()] =
- ((MeasureAggregator) convertedResult[dimensionCount
- + queryExecuterProperties.aggExpressionStartIndex + i][columnIndex]).get();
- }
- } else {
- QueryMeasure msr = null;
- for (int i = 0; i < queryModel.getQueryMeasures().size(); i++) {
- msr = queryModel.getQueryMeasures().get(i);
- if (msrAgg[queryExecuterProperties.measureStartIndex + i].isFirstTime()) {
- row[msr.getQueryOrder()] = null;
- } else {
- Object msrVal;
- switch (msr.getMeasure().getDataType()) {
- case LONG:
- msrVal = msrAgg[queryExecuterProperties.measureStartIndex + i].getLongValue();
- break;
- case DECIMAL:
- msrVal = msrAgg[queryExecuterProperties.measureStartIndex + i].getBigDecimalValue();
- break;
- default:
- msrVal = msrAgg[queryExecuterProperties.measureStartIndex + i].getDoubleValue();
- }
- row[msr.getQueryOrder()] = DataTypeUtil
- .getMeasureDataBasedOnDataType(msrVal,msr.getMeasure().getDataType());
- }
- }
- }
- values.add(new CarbonValue(new MeasureAggregator[0]));
- keys.add(new CarbonKey(row));
- }
- LOGGER.info("###########################################------ Total Number of records"
- + resultDataA[0].length);
- BatchResult chunkResult = new BatchResult();
- chunkResult.setKeys(keys);
- chunkResult.setValues(values);
- return chunkResult;
- }
-
- private void fillDimensionAggValue(QueryModel queryModel, Object[][] surrogateResult,
- int dimensionCount, int columnIndex, MeasureAggregator[] v) {
- Iterator<DimensionAggregatorInfo> dimAggInfoIterator =
- queryModel.getDimAggregationInfo().iterator();
- DimensionAggregatorInfo dimensionAggregatorInfo = null;
- List<String> partitionColumns = queryModel.getParitionColumns();
- int rowIndex = -1;
- int index = 0;
- while (dimAggInfoIterator.hasNext()) {
- dimensionAggregatorInfo = dimAggInfoIterator.next();
- for (int j = 0; j < dimensionAggregatorInfo.getAggList().size(); j++) {
- ++rowIndex;
- if (!dimensionAggregatorInfo.getAggList().get(j)
- .equals(CarbonCommonConstants.DISTINCT_COUNT)) {
- v[index++] =
- ((MeasureAggregator) surrogateResult[dimensionCount + rowIndex][columnIndex]);
- } else if (partitionColumns.size() == 1 && partitionColumns
- .contains(dimensionAggregatorInfo.getColumnName()) && dimensionAggregatorInfo
- .getAggList().get(j).equals(CarbonCommonConstants.DISTINCT_COUNT)) {
- double value =
- ((MeasureAggregator) surrogateResult[dimensionCount + rowIndex][columnIndex])
- .getDoubleValue();
-
- MeasureAggregator countAggregator = new CountAggregator();
- countAggregator.setNewValue(value);
- v[index++] = countAggregator;
- } else {
- if (surrogateResult[dimensionCount
- + rowIndex][columnIndex] instanceof DistinctCountAggregator) {
-
- Iterator<Integer> iterator =
- ((DistinctCountAggregator) surrogateResult[dimensionCount + rowIndex][columnIndex])
- .getBitMap().iterator();
-
- MeasureAggregator distinctCountAggregatorObjct = new DistinctStringCountAggregator();
- while (iterator.hasNext()) {
- String member = queryExecuterProperties.columnToDictionayMapping
- .get(dimensionAggregatorInfo.getDim().getColumnId())
- .getDictionaryValueForKey(iterator.next());
- if (!member.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL)) {
- distinctCountAggregatorObjct.agg(member);
- }
- }
- v[index++] = distinctCountAggregatorObjct;
- } else {
- v[index++] =
- ((MeasureAggregator) surrogateResult[dimensionCount + rowIndex][columnIndex]);
- }
- }
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/carbon/result/preparator/impl/RawQueryResultPreparatorImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/result/preparator/impl/RawQueryResultPreparatorImpl.java b/core/src/main/java/org/carbondata/query/carbon/result/preparator/impl/RawQueryResultPreparatorImpl.java
index 0eb60ff..0ae6651 100644
--- a/core/src/main/java/org/carbondata/query/carbon/result/preparator/impl/RawQueryResultPreparatorImpl.java
+++ b/core/src/main/java/org/carbondata/query/carbon/result/preparator/impl/RawQueryResultPreparatorImpl.java
@@ -1,22 +1,24 @@
package org.carbondata.query.carbon.result.preparator.impl;
+import java.util.List;
+
import org.carbondata.common.logging.LogService;
import org.carbondata.common.logging.LogServiceFactory;
-import org.carbondata.query.aggregator.MeasureAggregator;
import org.carbondata.query.carbon.executor.impl.QueryExecutorProperties;
import org.carbondata.query.carbon.model.QueryDimension;
import org.carbondata.query.carbon.model.QueryMeasure;
import org.carbondata.query.carbon.model.QueryModel;
import org.carbondata.query.carbon.model.QuerySchemaInfo;
import org.carbondata.query.carbon.result.BatchRawResult;
+import org.carbondata.query.carbon.result.BatchResult;
+import org.carbondata.query.carbon.result.ListBasedResultWrapper;
import org.carbondata.query.carbon.result.Result;
-import org.carbondata.query.carbon.util.DataTypeUtil;
-import org.carbondata.query.carbon.wrappers.ByteArrayWrapper;
/**
* It does not decode the dictionary.
*/
-public class RawQueryResultPreparatorImpl extends AbstractQueryResultPreparator<BatchRawResult> {
+public class RawQueryResultPreparatorImpl
+ extends AbstractQueryResultPreparator<List<ListBasedResultWrapper>, Object> {
private static final LogService LOGGER =
LogServiceFactory.getLogService(RawQueryResultPreparatorImpl.class.getName());
@@ -33,7 +35,7 @@ public class RawQueryResultPreparatorImpl extends AbstractQueryResultPreparator<
.toArray(new QueryDimension[queryModel.getQueryDimension().size()]));
querySchemaInfo.setQueryMeasures(queryModel.getQueryMeasures()
.toArray(new QueryMeasure[queryModel.getQueryMeasures().size()]));
- int msrSize = queryExecuterProperties.measureAggregators.length;
+ int msrSize = queryExecuterProperties.measureDataTypes.length;
int dimensionCount = queryModel.getQueryDimension().size();
int[] queryOrder = new int[dimensionCount + msrSize];
int[] queryReverseOrder = new int[dimensionCount + msrSize];
@@ -49,75 +51,34 @@ public class RawQueryResultPreparatorImpl extends AbstractQueryResultPreparator<
querySchemaInfo.setQueryReverseOrder(queryReverseOrder);
}
- @Override public BatchRawResult prepareQueryResult(Result scannedResult) {
+ @Override public BatchResult prepareQueryResult(
+ Result<List<ListBasedResultWrapper>, Object> scannedResult) {
if ((null == scannedResult || scannedResult.size() < 1)) {
- BatchRawResult batchRawResult = new BatchRawResult(new Object[0][0]);
+ BatchRawResult batchRawResult = new BatchRawResult();
batchRawResult.setQuerySchemaInfo(querySchemaInfo);
return batchRawResult;
}
- int msrSize = queryExecuterProperties.measureAggregators.length;
- int totalNumberOfColumn = msrSize + 1;
- Object[][] resultData = new Object[scannedResult.size()][totalNumberOfColumn];
- int currentRow = 0;
- ByteArrayWrapper key = null;
- MeasureAggregator[] value = null;
+ int msrSize = queryExecuterProperties.measureDataTypes.length;
+ Object[][] resultData = new Object[scannedResult.size()][];
+ Object[] value;
+ Object[] row;
+ int counter = 0;
while (scannedResult.hasNext()) {
- key = scannedResult.getKey();
value = scannedResult.getValue();
- resultData[currentRow][0] = key;
- for (int i = 0; i < msrSize; i++) {
- resultData[currentRow][1 + i] = value[i];
+ row = new Object[msrSize + 1];
+ row[0] = scannedResult.getKey();
+ if(value != null) {
+ System.arraycopy(value, 0, row, 1, msrSize);
}
- currentRow++;
- }
-
- if (resultData.length > 0) {
- resultData = encodeToRows(resultData);
+ resultData[counter] = row;
+ counter ++;
}
- BatchRawResult result = getResult(queryModel, resultData);
+ LOGGER.info("###########################---- Total Number of records" + scannedResult.size());
+ BatchRawResult result = new BatchRawResult();
+ result.setRows(resultData);
result.setQuerySchemaInfo(querySchemaInfo);
return result;
}
-
- private BatchRawResult getResult(QueryModel queryModel, Object[][] convertedResult) {
-
- int msrCount = queryExecuterProperties.measureAggregators.length;
- Object[][] resultDataA = new Object[1 + msrCount][convertedResult[0].length];
-
- for (int columnIndex = 0; columnIndex < resultDataA[0].length; columnIndex++) {
- resultDataA[0][columnIndex] = convertedResult[0][columnIndex];
- MeasureAggregator[] msrAgg =
- new MeasureAggregator[queryExecuterProperties.measureAggregators.length];
-
- fillMeasureValueForAggGroupByQuery(queryModel, convertedResult, 1, columnIndex, msrAgg);
-
- QueryMeasure msr = null;
- for (int i = 0; i < queryModel.getQueryMeasures().size(); i++) {
- msr = queryModel.getQueryMeasures().get(i);
- if (msrAgg[queryExecuterProperties.measureStartIndex + i].isFirstTime()) {
- resultDataA[i + 1][columnIndex] = null;
- } else {
- Object msrVal;
- switch (msr.getMeasure().getDataType()) {
- case LONG:
- msrVal = msrAgg[queryExecuterProperties.measureStartIndex + i].getLongValue();
- break;
- case DECIMAL:
- msrVal = msrAgg[queryExecuterProperties.measureStartIndex + i].getBigDecimalValue();
- break;
- default:
- msrVal = msrAgg[queryExecuterProperties.measureStartIndex + i].getDoubleValue();
- }
- resultDataA[i + 1][columnIndex] = DataTypeUtil
- .getMeasureDataBasedOnDataType(msrVal,
- msr.getMeasure().getDataType());
- }
- }
- }
- LOGGER.info("###########################################------ Total Number of records"
- + resultDataA[0].length);
- return new BatchRawResult(resultDataA);
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/filter/executer/RowLevelFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/filter/executer/RowLevelFilterExecuterImpl.java b/core/src/main/java/org/carbondata/query/filter/executer/RowLevelFilterExecuterImpl.java
index ca7c77a..0377580 100644
--- a/core/src/main/java/org/carbondata/query/filter/executer/RowLevelFilterExecuterImpl.java
+++ b/core/src/main/java/org/carbondata/query/filter/executer/RowLevelFilterExecuterImpl.java
@@ -35,8 +35,6 @@ import org.carbondata.core.constants.CarbonCommonConstants;
import org.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
import org.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
import org.carbondata.core.util.CarbonUtil;
-import org.carbondata.query.aggregator.MeasureAggregator;
-import org.carbondata.query.aggregator.util.MeasureAggregatorFactory;
import org.carbondata.query.carbon.executor.exception.QueryExecutionException;
import org.carbondata.query.carbon.processor.BlocksChunkHolder;
import org.carbondata.query.carbon.util.DataTypeUtil;
@@ -209,44 +207,25 @@ public class RowLevelFilterExecuterImpl implements FilterExecuter {
if (!msrColumnEvalutorInfo.isMeasureExistsInCurrentSlice()) {
record[msrColumnEvalutorInfo.getRowIndex()] = msrColumnEvalutorInfo.getDefaultValue();
} else {
- if (msrColumnEvalutorInfo.isCustomMeasureValue()) {
- MeasureAggregator aggregator = MeasureAggregatorFactory
- .getAggregator(msrColumnEvalutorInfo.getAggregator(),
- msrColumnEvalutorInfo.getType());
- aggregator.merge(
- blockChunkHolder.getMeasureDataChunk()[msrColumnEvalutorInfo.getColumnIndex()]
- .getMeasureDataHolder().getReadableByteArrayValueByIndex(index));
- switch (msrType) {
- case LONG:
- record[msrColumnEvalutorInfo.getRowIndex()] = aggregator.getLongValue();
- break;
- case DECIMAL:
- record[msrColumnEvalutorInfo.getRowIndex()] = aggregator.getBigDecimalValue();
- break;
- default:
- record[msrColumnEvalutorInfo.getRowIndex()] = aggregator.getDoubleValue();
- }
- } else {
- Object msrValue;
- switch (msrType) {
- case LONG:
- msrValue =
- blockChunkHolder.getMeasureDataChunk()[msrColumnEvalutorInfo.getColumnIndex()]
- .getMeasureDataHolder().getReadableLongValueByIndex(index);
- break;
- case DECIMAL:
- msrValue =
- blockChunkHolder.getMeasureDataChunk()[msrColumnEvalutorInfo.getColumnIndex()]
- .getMeasureDataHolder().getReadableBigDecimalValueByIndex(index);
- break;
- default:
- msrValue =
- blockChunkHolder.getMeasureDataChunk()[msrColumnEvalutorInfo.getColumnIndex()]
- .getMeasureDataHolder().getReadableDoubleValueByIndex(index);
- }
- record[msrColumnEvalutorInfo.getRowIndex()] = msrValue;
-
+ Object msrValue;
+ switch (msrType) {
+ case LONG:
+ msrValue =
+ blockChunkHolder.getMeasureDataChunk()[msrColumnEvalutorInfo.getColumnIndex()]
+ .getMeasureDataHolder().getReadableLongValueByIndex(index);
+ break;
+ case DECIMAL:
+ msrValue =
+ blockChunkHolder.getMeasureDataChunk()[msrColumnEvalutorInfo.getColumnIndex()]
+ .getMeasureDataHolder().getReadableBigDecimalValueByIndex(index);
+ break;
+ default:
+ msrValue =
+ blockChunkHolder.getMeasureDataChunk()[msrColumnEvalutorInfo.getColumnIndex()]
+ .getMeasureDataHolder().getReadableDoubleValueByIndex(index);
}
+ record[msrColumnEvalutorInfo.getRowIndex()] = msrValue;
+
}
}
row.setValues(record);
@@ -275,7 +254,7 @@ public class RowLevelFilterExecuterImpl implements FilterExecuter {
* Read the actual filter member by passing the dictionary value from
* the forward dictionary cache which which holds column wise cache
*
- * @param dimColumnEvaluaatorInfo
+ * @param dimColumnEvaluatorInfo
* @param dictionaryValue
* @param forwardDictionary
* @return
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/filter/resolver/resolverinfo/MeasureColumnResolvedFilterInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/filter/resolver/resolverinfo/MeasureColumnResolvedFilterInfo.java b/core/src/main/java/org/carbondata/query/filter/resolver/resolverinfo/MeasureColumnResolvedFilterInfo.java
index 9cefbdb..e6877d5 100644
--- a/core/src/main/java/org/carbondata/query/filter/resolver/resolverinfo/MeasureColumnResolvedFilterInfo.java
+++ b/core/src/main/java/org/carbondata/query/filter/resolver/resolverinfo/MeasureColumnResolvedFilterInfo.java
@@ -31,8 +31,6 @@ public class MeasureColumnResolvedFilterInfo implements Serializable {
private int rowIndex = -1;
- private boolean isCustomMeasureValue;
-
private Object uniqueValue;
private String aggregator;
@@ -59,14 +57,6 @@ public class MeasureColumnResolvedFilterInfo implements Serializable {
this.rowIndex = rowIndex;
}
- public boolean isCustomMeasureValue() {
- return isCustomMeasureValue;
- }
-
- public void setCustomMeasureValue(boolean isCustomMeasureValue) {
- this.isCustomMeasureValue = isCustomMeasureValue;
- }
-
public Object getUniqueValue() {
return uniqueValue;
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/scanner/impl/CarbonKey.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/scanner/impl/CarbonKey.java b/core/src/main/java/org/carbondata/query/scanner/impl/CarbonKey.java
deleted file mode 100644
index 7f6d7f2..0000000
--- a/core/src/main/java/org/carbondata/query/scanner/impl/CarbonKey.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- *
- */
-package org.carbondata.query.scanner.impl;
-
-import java.io.Serializable;
-import java.util.Arrays;
-
-/**
- * @author R00900208
- */
-public class CarbonKey implements Serializable, Comparable<CarbonKey> {
-
- /**
- *
- */
- private static final long serialVersionUID = -8773813519739848506L;
-
- private Object[] key;
-
- public CarbonKey(Object[] key) {
- this.key = key;
- }
-
- /**
- * @return the key
- */
- public Object[] getKey() {
- return key;
- }
-
- public CarbonKey getSubKey(int size) {
- Object[] crop = new Object[size];
- System.arraycopy(key, 0, crop, 0, size);
- return new CarbonKey(crop);
- }
-
- /* (non-Javadoc)
- * @see java.lang.Object#hashCode()
- */
- @Override public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + Arrays.hashCode(key);
- return result;
- }
-
- /* (non-Javadoc)
- * @see java.lang.Object#equals(java.lang.Object)
- */
- @Override public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
- if (obj == null) {
- return false;
- }
- if (getClass() != obj.getClass()) {
- return false;
- }
- CarbonKey other = (CarbonKey) obj;
- if (!Arrays.equals(key, other.key)) {
- return false;
- }
- return true;
- }
-
- @Override public String toString() {
- return Arrays.toString(key);
- }
-
- @Override public int compareTo(CarbonKey other) {
- Object[] oKey = other.key;
-
- int l = 0;
- for (int i = 0; i < key.length; i++) {
- l = ((Comparable) key[i]).compareTo(oKey[i]);
- if (l != 0) {
- return l;
- }
- }
-
- return 0;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/scanner/impl/CarbonValue.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/scanner/impl/CarbonValue.java b/core/src/main/java/org/carbondata/query/scanner/impl/CarbonValue.java
deleted file mode 100644
index 68e7226..0000000
--- a/core/src/main/java/org/carbondata/query/scanner/impl/CarbonValue.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.carbondata.query.scanner.impl;
-
-import java.io.Serializable;
-import java.util.Arrays;
-
-import org.carbondata.query.aggregator.MeasureAggregator;
-
-public class CarbonValue implements Serializable, Comparable<CarbonValue> {
-
- /**
- *
- */
- private static final long serialVersionUID = 8034398963696130423L;
-
- private MeasureAggregator[] values;
-
- private int topNIndex;
-
- public CarbonValue(MeasureAggregator[] values) {
- this.values = values;
- }
-
- /**
- * @return the values
- */
- public MeasureAggregator[] getValues() {
- return values;
- }
-
- public CarbonValue merge(CarbonValue another) {
- for (int i = 0; i < values.length; i++) {
- values[i].merge(another.values[i]);
- }
- return this;
- }
-
- public void setTopNIndex(int index) {
- this.topNIndex = index;
- }
-
- public void addGroup(CarbonKey key, CarbonValue value) {
-
- }
-
- public CarbonValue mergeKeyVal(CarbonValue another) {
- return another;
- }
-
- @Override public String toString() {
- return Arrays.toString(values);
- }
-
- @Override public int compareTo(CarbonValue o) {
- return values[topNIndex].compareTo(o.values[topNIndex]);
- }
-
- @Override public boolean equals(Object obj) {
- if(!(obj instanceof CarbonValue)) {
- return false;
- }
- CarbonValue o = (CarbonValue)obj;
- return values[topNIndex].equals(o.values[o.topNIndex]);
- }
-
- @Override public int hashCode() {
- return values[topNIndex].hashCode();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/hadoop/src/main/java/org/carbondata/hadoop/CarbonRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/carbondata/hadoop/CarbonRecordReader.java b/hadoop/src/main/java/org/carbondata/hadoop/CarbonRecordReader.java
index 8ea0104..3d54d96 100644
--- a/hadoop/src/main/java/org/carbondata/hadoop/CarbonRecordReader.java
+++ b/hadoop/src/main/java/org/carbondata/hadoop/CarbonRecordReader.java
@@ -13,8 +13,8 @@ import org.carbondata.hadoop.readsupport.CarbonReadSupport;
import org.carbondata.query.carbon.executor.QueryExecutorFactory;
import org.carbondata.query.carbon.executor.exception.QueryExecutionException;
import org.carbondata.query.carbon.model.QueryModel;
-import org.carbondata.query.carbon.result.BatchRawResult;
-import org.carbondata.query.carbon.result.iterator.ChunkRawRowIterartor;
+import org.carbondata.query.carbon.result.BatchResult;
+import org.carbondata.query.carbon.result.iterator.ChunkRowIterator;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
@@ -48,8 +48,8 @@ public class CarbonRecordReader<T> extends RecordReader<Void, T> {
readSupport
.intialize(queryModel.getProjectionColumns(), queryModel.getAbsoluteTableIdentifier());
try {
- carbonIterator = new ChunkRawRowIterartor(
- (CarbonIterator<BatchRawResult>) QueryExecutorFactory.getQueryExecutor(queryModel)
+ carbonIterator = new ChunkRowIterator(
+ (CarbonIterator<BatchResult>) QueryExecutorFactory.getQueryExecutor(queryModel)
.execute(queryModel));
} catch (QueryExecutionException e) {
throw new InterruptedException(e.getMessage());
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
index f728a32..b6f589d 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
@@ -17,17 +17,12 @@
package org.apache.spark.sql
-import scala.collection.mutable.MutableList
-
-import org.apache.spark.sql.agg.{CarbonAverage, CarbonCount}
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.catalyst.plans.logical.{UnaryNode, _}
import org.apache.spark.sql.execution.command.tableModel
-import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.optimizer.{CarbonAliasDecoderRelation, CarbonDecoderRelation}
import org.apache.spark.sql.types._
@@ -202,328 +197,3 @@ case class FakeCarbonCast(child: Literal, dataType: DataType)
override def eval(input: InternalRow): Any = child.value
}
-
-/**
- * A pattern that matches any number of project or filter operations on top of another relational
- * operator. All filter operators are collected and their conditions are broken up and returned
- * together with the top project operator.
- * [[org.apache.spark.sql.catalyst.expressions.Alias Aliases]] are in-lined/substituted if
- * necessary.
- */
-object PhysicalOperation1 extends PredicateHelper {
- type ReturnType = (Seq[NamedExpression], Seq[Expression], Option[Seq[Expression]],
- Option[Seq[SortOrder]], Option[Expression], LogicalPlan)
-
- def apply(plan: LogicalPlan): Option[ReturnType] = {
- val (fields, filters, child, _, groupby, sortOrder, limit) =
- collectProjectsAndFilters(plan)
-
- Some((fields.getOrElse(child.output), filters, groupby, sortOrder, limit, child))
- }
-
- /**
- * Collects projects and filters, in-lining/substituting aliases if necessary. Here are two
- * examples for alias in-lining/substitution. Before:
- * {{{
- * SELECT c1 FROM (SELECT key AS c1 FROM t1) t2 WHERE c1 > 10
- * SELECT c1 AS c2 FROM (SELECT key AS c1 FROM t1) t2 WHERE c1 > 10
- * }}}
- * After:
- * {{{
- * SELECT key AS c1 FROM t1 WHERE key > 10
- * SELECT key AS c2 FROM t1 WHERE key > 10
- * }}}
- */
- def collectProjectsAndFilters(plan: LogicalPlan):
- (Option[Seq[NamedExpression]], Seq[Expression], LogicalPlan,
- Map[Attribute, Expression], Option[Seq[Expression]],
- Option[Seq[SortOrder]], Option[Expression]) = {
- plan match {
- case Project(fields, child) =>
- val (_, filters, other, aliases, groupby, sortOrder, limit) = collectProjectsAndFilters(
- child)
- val substitutedFields = fields.map(substitute(aliases)).asInstanceOf[Seq[NamedExpression]]
- (Some(substitutedFields), filters, other, collectAliases(
- substitutedFields), groupby, sortOrder, limit)
-
- case Filter(condition, child) =>
- val (fields, filters, other, aliases, groupby, sortOrder, limit) =
- collectProjectsAndFilters(child)
- val substitutedCondition = substitute(aliases)(condition)
- (fields, filters ++ splitConjunctivePredicates(
- substitutedCondition), other, aliases, groupby, sortOrder, limit)
-
- case Aggregate(groupingExpressions, aggregateExpressions, child) =>
- val (fields, filters, other, aliases, _, sortOrder, limit) = collectProjectsAndFilters(
- child)
-
- var aggExps: Seq[AggregateExpression] = Nil
- aggregateExpressions.foreach(v => {
- val list = findAggreagateExpression(v)
- aggExps = aggExps ++ list
- })
-
- (fields, filters, other, aliases ++ collectAliases(aggregateExpressions), Some(
- aggregateExpressions), sortOrder, limit)
- case Sort(order, _, child) =>
- val (fields, filters, other, aliases, groupby, _, limit) = collectProjectsAndFilters(child)
- val substitutedOrder = order.map(s => SortOrder(substitute(aliases)(s.child), s.direction))
- (fields, filters, other, aliases, groupby, Some(substitutedOrder), limit)
- case Limit(limitExpr, child) =>
- val (fields, filters, other, aliases, groupby, sortOrder, _) = collectProjectsAndFilters(
- child)
- (fields, filters, other, aliases, groupby, sortOrder, Some(limitExpr))
- case other =>
- (None, Nil, other, Map.empty, None, None, None)
- }
- }
-
- def findAggreagateExpression(expr: Expression): Seq[AggregateExpression] = {
- val exprList = expr match {
- case d: AggregateExpression => d :: Nil
- case Alias(ref, name) => findAggreagateExpression(ref)
- case other =>
- var listout: Seq[AggregateExpression] = Nil
-
- other.children.foreach(v => {
- val list = findAggreagateExpression(v)
- listout = listout ++ list
- })
- listout
- }
- exprList
- }
-
- def collectProjectsAndFilters1(plan: LogicalPlan):
- (Option[Seq[NamedExpression]], Seq[Expression], LogicalPlan, Map[Attribute, Expression],
- Option[Seq[Expression]], Option[Seq[SortOrder]], Option[Expression]) = {
- plan match {
- case Project(fields, child) =>
- val (_, filters, other, aliases, groupby, sortOrder, limit) = collectProjectsAndFilters(
- child)
- val substitutedFields = fields.map(substitute(aliases)).asInstanceOf[Seq[NamedExpression]]
- (Some(substitutedFields), filters, other, collectAliases(
- substitutedFields), groupby, sortOrder, limit)
-
- case Filter(condition, child) =>
- val (fields, filters, other, aliases, groupby, sortOrder, limit) =
- collectProjectsAndFilters(child)
- val substitutedCondition = substitute(aliases)(condition)
- (fields, filters ++ splitConjunctivePredicates(
- substitutedCondition), other, aliases, groupby, sortOrder, limit)
-
- case Aggregate(groupingExpressions, aggregateExpressions, child) =>
- val (fields, filters, other, aliases, _, sortOrder, limit) = collectProjectsAndFilters(
- child)
- val aggExps = aggregateExpressions.map {
- case Alias(ref, name) => ref
- case others => others
- }.filter {
- case d: AggregateExpression => true
- case _ => false
- }
- (fields, filters, other, aliases ++ collectAliases(aggregateExpressions), Some(
- aggExps), sortOrder, limit)
- case Sort(order, _, child) =>
- val (fields, filters, other, aliases, groupby, _, limit) = collectProjectsAndFilters(child)
- val substitutedOrder = order.map(s => SortOrder(substitute(aliases)(s.child), s.direction))
- (fields, filters, other, aliases, groupby, Some(substitutedOrder), limit)
- case Limit(limitExpr, child) =>
- val (fields, filters, other, aliases, groupby, sortOrder, _) = collectProjectsAndFilters(
- child)
- (fields, filters, other, aliases, groupby, sortOrder, Some(limitExpr))
- case other =>
- (None, Nil, other, Map.empty, None, None, None)
- }
- }
-
- private def collectAliases(fields: Seq[Expression]) = {
- fields.collect {
- case a@Alias(child, _) => a.toAttribute -> child
- }.toMap
- }
-
- private def substitute(aliases: Map[Attribute, Expression])(expr: Expression) = {
- expr.transform {
- case a@Alias(ref: AttributeReference, name) =>
- aliases.get(ref).map(Alias(_, name)(a.exprId, a.qualifiers)).getOrElse(a)
-
- case a: AttributeReference =>
- aliases.get(a).map(Alias(_, a.name)(a.exprId, a.qualifiers)).getOrElse(a)
- }
- }
-}
-
-case class PositionLiteral(expr: Expression, intermediateDataType: DataType)
- extends LeafExpression with CodegenFallback {
- override def dataType: DataType = expr.dataType
-
- override def nullable: Boolean = false
-
- type EvaluatedType = Any
- var position = -1
-
- def setPosition(pos: Int): Unit = position = pos
-
- override def toString: String = s"PositionLiteral($position : $expr)"
-
- override def eval(input: InternalRow): Any = {
- if (position != -1) {
- input.get(position, intermediateDataType)
- } else {
- expr.eval(input)
- }
- }
-}
-
-/**
- * Matches a logical aggregation that can be performed on distributed data in two steps. The first
- * operates on the data in each partition performing partial aggregation for each group. The second
- * occurs after the shuffle and completes the aggregation.
- *
- * This pattern will only match if all aggregate expressions can be computed partially and will
- * return the rewritten aggregation expressions for both phases.
- *
- * The returned values for this match are as follows:
- * - Grouping attributes for the final aggregation.
- * - Aggregates for the final aggregation.
- * - Grouping expressions for the partial aggregation.
- * - Partial aggregate expressions.
- * - Input to the aggregation.
- */
-object CarbonAggregation {
- type ReturnType = (Seq[Expression], Seq[NamedExpression], LogicalPlan)
-
- private def convertAggregatesForPushdown(convertUnknown: Boolean,
- rewrittenAggregateExpressions: Seq[Expression],
- oneAttr: AttributeReference) = {
- if (canBeConvertedToCarbonAggregate(rewrittenAggregateExpressions)) {
- var counter: Int = 0
- var updatedExpressions = MutableList[Expression]()
- rewrittenAggregateExpressions.foreach(v => {
- val updated = convertAggregate(v, counter, convertUnknown, oneAttr)
- updatedExpressions += updated
- counter = counter + 1
- })
- updatedExpressions
- } else {
- rewrittenAggregateExpressions
- }
- }
-
- def makePositionLiteral(expr: Expression, index: Int, dataType: DataType): PositionLiteral = {
- val posLiteral = PositionLiteral(expr, dataType)
- posLiteral.setPosition(index)
- posLiteral
- }
-
- def convertAggregate(current: Expression,
- index: Int,
- convertUnknown: Boolean,
- oneAttr: AttributeReference): Expression = {
- if (!convertUnknown && canBeConverted(current)) {
- current.transform {
- case Average(attr: AttributeReference) =>
- val convertedDataType = transformArrayType(attr)
- CarbonAverage(makePositionLiteral(convertedDataType, index, convertedDataType.dataType))
- case Average(Cast(attr: AttributeReference, dataType)) =>
- val convertedDataType = transformArrayType(attr)
- CarbonAverage(
- makePositionLiteral(convertedDataType, index, convertedDataType.dataType))
- case Count(Seq(s: Literal)) =>
- CarbonCount(s, Some(makePositionLiteral(transformLongType(oneAttr), index, LongType)))
- case Count(Seq(attr: AttributeReference)) =>
- CarbonCount(makePositionLiteral(transformLongType(attr), index, LongType))
- case Sum(attr: AttributeReference) =>
- Sum(makePositionLiteral(attr, index, attr.dataType))
- case Sum(Cast(attr: AttributeReference, dataType)) =>
- Sum(Cast(makePositionLiteral(attr, index, attr.dataType), dataType))
- case Min(attr: AttributeReference) => Min(makePositionLiteral(attr, index, attr.dataType))
- case Min(Cast(attr: AttributeReference, dataType)) =>
- Min(Cast(makePositionLiteral(attr, index, attr.dataType), dataType))
- case Max(attr: AttributeReference) =>
- Max(makePositionLiteral(attr, index, attr.dataType))
- case Max(Cast(attr: AttributeReference, dataType)) =>
- Max(Cast(makePositionLiteral(attr, index, attr.dataType), dataType))
- }
- } else {
- current
- }
- }
-
- def canBeConverted(current: Expression): Boolean = current match {
- case Alias(AggregateExpression(Average(attr: AttributeReference), _, false), _) => true
- case Alias(AggregateExpression(Average(Cast(attr: AttributeReference, _)), _, false), _) => true
- case Alias(AggregateExpression(Count(Seq(s: Literal)), _, false), _) => true
- case Alias(AggregateExpression(Count(Seq(attr: AttributeReference)), _, false), _) => true
- case Alias(AggregateExpression(Sum(attr: AttributeReference), _, false), _) => true
- case Alias(AggregateExpression(Sum(Cast(attr: AttributeReference, _)), _, false), _) => true
- case Alias(AggregateExpression(Min(attr: AttributeReference), _, false), _) => true
- case Alias(AggregateExpression(Min(Cast(attr: AttributeReference, _)), _, false), _) => true
- case Alias(AggregateExpression(Max(attr: AttributeReference), _, false), _) => true
- case Alias(AggregateExpression(Max(Cast(attr: AttributeReference, _)), _, false), _) => true
- case _ => false
- }
-
- def transformArrayType(attr: AttributeReference): AttributeReference = {
- AttributeReference(attr.name, ArrayType(DoubleType), attr.nullable, attr.metadata)(attr.exprId,
- attr.qualifiers)
- }
-
- def transformLongType(attr: AttributeReference): AttributeReference = {
- AttributeReference(attr.name, LongType, attr.nullable, attr.metadata)(attr.exprId,
- attr.qualifiers)
- }
-
- /**
- * There should be sync between carbonOperators validation and here. we should not convert to
- * carbon aggregates if the validation does not satisfy.
- */
- def canBeConvertedToCarbonAggregate(expressions: Seq[Expression]): Boolean = {
- val detailQuery = expressions.map {
- case attr@AttributeReference(_, _, _, _) => true
- case Alias(agg: AggregateExpression, name) => true
- case _ => false
- }.exists(!_)
- !detailQuery
- }
-
- def unapply(plan: LogicalPlan): Option[ReturnType] = unapply((plan, false))
-
- def unapply(combinedPlan: (LogicalPlan, Boolean)): Option[ReturnType] = {
- val oneAttr = getOneAttribute(combinedPlan._1)
- combinedPlan._1 match {
- case Aggregate(groupingExpressions, aggregateExpressionsOrig, child) =>
-
- // if detailed query dont convert aggregate expressions to Carbon Aggregate expressions
- val aggregateExpressions =
- if (combinedPlan._2) {
- aggregateExpressionsOrig
- }
- else {
- convertAggregatesForPushdown(false, aggregateExpressionsOrig, oneAttr)
- }
- Some((groupingExpressions, aggregateExpressions.asInstanceOf[Seq[NamedExpression]], child))
- case _ => None
- }
- }
-
- def getOneAttribute(plan: LogicalPlan): AttributeReference = {
- var relation: LogicalRelation = null
- plan collect {
- case l: LogicalRelation => relation = l
- }
- if (relation != null) {
- relation.output.find { p =>
- p.dataType match {
- case n: NumericType => true
- case _ => false
- }
- }.getOrElse(relation.output.head)
- } else {
- null
- }
- }
-}
-
-