You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by jb...@apache.org on 2016/06/23 14:16:10 UTC
[22/56] [abbrv] incubator-carbondata git commit: Optimized detail
query flow and cleanup (#691)
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/29360501/integration/spark/src/main/java/org/carbondata/integration/spark/merger/CarbonCompactionExecutor.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/carbondata/integration/spark/merger/CarbonCompactionExecutor.java b/integration/spark/src/main/java/org/carbondata/integration/spark/merger/CarbonCompactionExecutor.java
index 6ad4648..ce0f656 100644
--- a/integration/spark/src/main/java/org/carbondata/integration/spark/merger/CarbonCompactionExecutor.java
+++ b/integration/spark/src/main/java/org/carbondata/integration/spark/merger/CarbonCompactionExecutor.java
@@ -186,6 +186,7 @@ public class CarbonCompactionExecutor {
model.setCountStarQuery(false);
model.setDetailQuery(true);
model.setForcedDetailRawQuery(true);
+ model.setRawBytesDetailQuery(true);
model.setFilterExpressionResolverTree(null);
List<QueryDimension> dims = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/29360501/integration/spark/src/main/java/org/carbondata/spark/partition/api/impl/PartitionImpl.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/carbondata/spark/partition/api/impl/PartitionImpl.java b/integration/spark/src/main/java/org/carbondata/spark/partition/api/impl/PartitionImpl.java
index 8abbc4c..6ea4c2d 100644
--- a/integration/spark/src/main/java/org/carbondata/spark/partition/api/impl/PartitionImpl.java
+++ b/integration/spark/src/main/java/org/carbondata/spark/partition/api/impl/PartitionImpl.java
@@ -28,12 +28,8 @@
*/
package org.carbondata.spark.partition.api.impl;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-import org.carbondata.core.constants.CarbonCommonConstants;
-import org.carbondata.query.queryinterface.query.metadata.CarbonDimensionLevelFilter;
import org.carbondata.spark.partition.api.Partition;
public class PartitionImpl implements Partition {
@@ -41,9 +37,6 @@ public class PartitionImpl implements Partition {
private String uniqueID;
private String folderPath;
- private Map<String, CarbonDimensionLevelFilter> filterMap =
- new HashMap<String, CarbonDimensionLevelFilter>(
- CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
public PartitionImpl(String uniqueID, String folderPath) {
this.uniqueID = uniqueID;
@@ -58,10 +51,6 @@ public class PartitionImpl implements Partition {
return folderPath;
}
- public void setPartitionDetails(String columnName, CarbonDimensionLevelFilter filter) {
- filterMap.put(columnName, filter);
- }
-
@Override public String toString() {
return "{PartitionID -> " + uniqueID + " Path: " + folderPath + '}';
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/29360501/integration/spark/src/main/java/org/carbondata/spark/partition/api/impl/SampleDataPartitionerImpl.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/carbondata/spark/partition/api/impl/SampleDataPartitionerImpl.java b/integration/spark/src/main/java/org/carbondata/spark/partition/api/impl/SampleDataPartitionerImpl.java
index 5f00874..cae2f28 100644
--- a/integration/spark/src/main/java/org/carbondata/spark/partition/api/impl/SampleDataPartitionerImpl.java
+++ b/integration/spark/src/main/java/org/carbondata/spark/partition/api/impl/SampleDataPartitionerImpl.java
@@ -22,20 +22,18 @@ package org.carbondata.spark.partition.api.impl;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import java.util.Map;
import org.carbondata.common.logging.LogService;
import org.carbondata.common.logging.LogServiceFactory;
import org.carbondata.core.constants.CarbonCommonConstants;
import org.carbondata.query.carbon.model.CarbonQueryPlan;
-import org.carbondata.query.queryinterface.query.metadata.CarbonDimensionLevelFilter;
import org.carbondata.spark.partition.api.DataPartitioner;
import org.carbondata.spark.partition.api.Partition;
import org.apache.spark.sql.execution.command.Partitioner;
/**
- * Sample partition based on MSISDN.
+ * Sample partition.
*/
public class SampleDataPartitionerImpl implements DataPartitioner {
private static final LogService LOGGER =
@@ -78,13 +76,9 @@ public class SampleDataPartitionerImpl implements DataPartitioner {
PartitionImpl partitionImpl =
new PartitionImpl("" + partionCounter, baseLocation + '/' + partionCounter);
- CarbonDimensionLevelFilter filter = new CarbonDimensionLevelFilter();
List<Object> includedHashes = new ArrayList<Object>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
includedHashes.add(partionCounter);
- filter.setIncludeFilter(includedHashes);
- partitionImpl.setPartitionDetails(partitionColumn, filter);
-
allPartitions.add(partitionImpl);
}
}
@@ -121,20 +115,9 @@ public class SampleDataPartitionerImpl implements DataPartitioner {
/**
* Identify the partitions applicable for the given filter
*/
- public List<Partition> getPartitions(Map<String, CarbonDimensionLevelFilter> filters) {
- if (filters == null || filters.size() == 0 || filters.get(partitionColumn) == null) {
- return allPartitions;
- }
-
- CarbonDimensionLevelFilter msisdnFilter = filters.get(partitionColumn);
- List<Partition> allowedPartitions =
- new ArrayList<Partition>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+ public List<Partition> getPartitions() {
+ return allPartitions;
- if (msisdnFilter.getIncludeFilter().isEmpty()) {
- // Partition check can be done only for include filter list.
- // If the filter is of other type,return all the partitions list
- return allPartitions;
- }
// TODO: this has to be redone during partitioning implementation
// for (Partition aPartition : allPartitions) {
// CarbonDimensionLevelFilter partitionFilterDetails =
@@ -151,7 +134,6 @@ public class SampleDataPartitionerImpl implements DataPartitioner {
// }
// }
- return allowedPartitions;
}
private int hashCode(long key) {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/29360501/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index 4c7c460..5359009 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -159,6 +159,7 @@ case class CarbonDictionaryDecoder(
.createCache(CacheType.FORWARD_DICTIONARY, storePath)
val dicts: Seq[Dictionary] = getDictionary(absoluteTableIdentifiers,
forwardDictionaryCache)
+ val dictIndex = dicts.zipWithIndex.filter(x => x._1 != null).map(x => x._2)
new Iterator[InternalRow] {
val unsafeProjection = UnsafeProjection.create(output.map(_.dataType).toArray)
override final def hasNext: Boolean = iter.hasNext
@@ -166,15 +167,15 @@ case class CarbonDictionaryDecoder(
override final def next(): InternalRow = {
val row: InternalRow = iter.next()
val data = row.toSeq(dataTypes).toArray
- for (i <- data.indices) {
- if (dicts(i) != null && data(i) != null) {
- data(i) = toType(DataTypeUtil
- .getDataBasedOnDataType(dicts(i)
- .getDictionaryValueForKey(data(i).asInstanceOf[Integer]),
- getDictionaryColumnIds(i)._3))
+ dictIndex.foreach { index =>
+ if (data(index) != null) {
+ data(index) = DataTypeUtil
+ .getDataBasedOnDataType(dicts(index)
+ .getDictionaryValueForKey(data(index).asInstanceOf[Int]),
+ getDictionaryColumnIds(index)._3)
}
}
- unsafeProjection(new GenericMutableRow(data))
+ unsafeProjection(row)
}
}
}
@@ -191,13 +192,6 @@ case class CarbonDictionaryDecoder(
}
}
- private def toType(obj: Any): Any = {
- obj match {
- case s: String => UTF8String.fromString(s)
- case _ => obj
- }
- }
-
private def getDictionary(atiMap: Map[String, AbsoluteTableIdentifier],
cache: Cache[DictionaryColumnUniqueIdentifier, Dictionary]) = {
val dicts: Seq[Dictionary] = getDictionaryColumnIds.map { f =>
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/29360501/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala
index ba4c37e..22fa4fb 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala
@@ -187,12 +187,6 @@ case class CarbonScan(
override def outputsUnsafeRows: Boolean = attributesNeedToDecode.size() == 0
override def doExecute(): RDD[InternalRow] = {
- def toType(obj: Any): Any = {
- obj match {
- case s: String => UTF8String.fromString(s)
- case _ => obj
- }
- }
val outUnsafeRows: Boolean = attributesNeedToDecode.size() == 0
inputRdd.mapPartitions { iter =>
val unsafeProjection = UnsafeProjection.create(output.map(_.dataType).toArray)
@@ -201,9 +195,9 @@ case class CarbonScan(
override def next(): InternalRow =
if (outUnsafeRows) {
- unsafeProjection(new GenericMutableRow(iter.next().map(toType)))
+ unsafeProjection(new GenericMutableRow(iter.next()))
} else {
- new GenericMutableRow(iter.next().map(toType))
+ new GenericMutableRow(iter.next())
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/29360501/processing/src/test/java/org/carbondata/core/keygenerator/directdictionary/timestamp/TimeStampDirectDictionaryGenerator_UT.java
----------------------------------------------------------------------
diff --git a/processing/src/test/java/org/carbondata/core/keygenerator/directdictionary/timestamp/TimeStampDirectDictionaryGenerator_UT.java b/processing/src/test/java/org/carbondata/core/keygenerator/directdictionary/timestamp/TimeStampDirectDictionaryGenerator_UT.java
index 6ac8b60..601bae4 100644
--- a/processing/src/test/java/org/carbondata/core/keygenerator/directdictionary/timestamp/TimeStampDirectDictionaryGenerator_UT.java
+++ b/processing/src/test/java/org/carbondata/core/keygenerator/directdictionary/timestamp/TimeStampDirectDictionaryGenerator_UT.java
@@ -36,7 +36,7 @@ public class TimeStampDirectDictionaryGenerator_UT {
private int surrogateKey = -1;
@Before public void setUp() throws Exception {
- TimeStampDirectDictionaryGenerator generator = new TimeStampDirectDictionaryGenerator();
+ TimeStampDirectDictionaryGenerator generator = TimeStampDirectDictionaryGenerator.instance;
surrogateKey = generator.generateDirectSurrogateKey("2015-10-20 12:30:01");
}
@@ -46,7 +46,7 @@ public class TimeStampDirectDictionaryGenerator_UT {
* @throws Exception
*/
@Test public void generateDirectSurrogateKey() throws Exception {
- TimeStampDirectDictionaryGenerator generator = new TimeStampDirectDictionaryGenerator();
+ TimeStampDirectDictionaryGenerator generator = TimeStampDirectDictionaryGenerator.instance;
// default timestamp format is "yyyy-MM-dd HH:mm:ss" and the data being passed
// in "dd/MM/yyyy" so the parsing should fail and method should return -1.
int key = generator.generateDirectSurrogateKey("20/12/2014");
@@ -62,7 +62,7 @@ public class TimeStampDirectDictionaryGenerator_UT {
* @throws Exception
*/
@Test public void getValueFromSurrogate() throws Exception {
- TimeStampDirectDictionaryGenerator generator = new TimeStampDirectDictionaryGenerator();
+ TimeStampDirectDictionaryGenerator generator = TimeStampDirectDictionaryGenerator.instance;
long valueFromSurrogate = (long) generator.getValueFromSurrogate(surrogateKey);
Date date = new Date(valueFromSurrogate / 1000);
SimpleDateFormat timeParser = new SimpleDateFormat(CarbonProperties.getInstance()