You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by jb...@apache.org on 2016/06/23 14:16:10 UTC

[22/56] [abbrv] incubator-carbondata git commit: Optimized detail query flow and cleanup (#691)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/29360501/integration/spark/src/main/java/org/carbondata/integration/spark/merger/CarbonCompactionExecutor.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/carbondata/integration/spark/merger/CarbonCompactionExecutor.java b/integration/spark/src/main/java/org/carbondata/integration/spark/merger/CarbonCompactionExecutor.java
index 6ad4648..ce0f656 100644
--- a/integration/spark/src/main/java/org/carbondata/integration/spark/merger/CarbonCompactionExecutor.java
+++ b/integration/spark/src/main/java/org/carbondata/integration/spark/merger/CarbonCompactionExecutor.java
@@ -186,6 +186,7 @@ public class CarbonCompactionExecutor {
     model.setCountStarQuery(false);
     model.setDetailQuery(true);
     model.setForcedDetailRawQuery(true);
+    model.setRawBytesDetailQuery(true);
     model.setFilterExpressionResolverTree(null);
 
     List<QueryDimension> dims = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/29360501/integration/spark/src/main/java/org/carbondata/spark/partition/api/impl/PartitionImpl.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/carbondata/spark/partition/api/impl/PartitionImpl.java b/integration/spark/src/main/java/org/carbondata/spark/partition/api/impl/PartitionImpl.java
index 8abbc4c..6ea4c2d 100644
--- a/integration/spark/src/main/java/org/carbondata/spark/partition/api/impl/PartitionImpl.java
+++ b/integration/spark/src/main/java/org/carbondata/spark/partition/api/impl/PartitionImpl.java
@@ -28,12 +28,8 @@
  */
 package org.carbondata.spark.partition.api.impl;
 
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
-import org.carbondata.core.constants.CarbonCommonConstants;
-import org.carbondata.query.queryinterface.query.metadata.CarbonDimensionLevelFilter;
 import org.carbondata.spark.partition.api.Partition;
 
 public class PartitionImpl implements Partition {
@@ -41,9 +37,6 @@ public class PartitionImpl implements Partition {
   private String uniqueID;
   private String folderPath;
 
-  private Map<String, CarbonDimensionLevelFilter> filterMap =
-      new HashMap<String, CarbonDimensionLevelFilter>(
-          CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
 
   public PartitionImpl(String uniqueID, String folderPath) {
     this.uniqueID = uniqueID;
@@ -58,10 +51,6 @@ public class PartitionImpl implements Partition {
     return folderPath;
   }
 
-  public void setPartitionDetails(String columnName, CarbonDimensionLevelFilter filter) {
-    filterMap.put(columnName, filter);
-  }
-
   @Override public String toString() {
     return "{PartitionID -> " + uniqueID + " Path: " + folderPath + '}';
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/29360501/integration/spark/src/main/java/org/carbondata/spark/partition/api/impl/SampleDataPartitionerImpl.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/carbondata/spark/partition/api/impl/SampleDataPartitionerImpl.java b/integration/spark/src/main/java/org/carbondata/spark/partition/api/impl/SampleDataPartitionerImpl.java
index 5f00874..cae2f28 100644
--- a/integration/spark/src/main/java/org/carbondata/spark/partition/api/impl/SampleDataPartitionerImpl.java
+++ b/integration/spark/src/main/java/org/carbondata/spark/partition/api/impl/SampleDataPartitionerImpl.java
@@ -22,20 +22,18 @@ package org.carbondata.spark.partition.api.impl;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import java.util.Map;
 
 import org.carbondata.common.logging.LogService;
 import org.carbondata.common.logging.LogServiceFactory;
 import org.carbondata.core.constants.CarbonCommonConstants;
 import org.carbondata.query.carbon.model.CarbonQueryPlan;
-import org.carbondata.query.queryinterface.query.metadata.CarbonDimensionLevelFilter;
 import org.carbondata.spark.partition.api.DataPartitioner;
 import org.carbondata.spark.partition.api.Partition;
 
 import org.apache.spark.sql.execution.command.Partitioner;
 
 /**
- * Sample partition based on MSISDN.
+ * Sample partition.
  */
 public class SampleDataPartitionerImpl implements DataPartitioner {
   private static final LogService LOGGER =
@@ -78,13 +76,9 @@ public class SampleDataPartitionerImpl implements DataPartitioner {
       PartitionImpl partitionImpl =
           new PartitionImpl("" + partionCounter, baseLocation + '/' + partionCounter);
 
-      CarbonDimensionLevelFilter filter = new CarbonDimensionLevelFilter();
       List<Object> includedHashes = new ArrayList<Object>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
       includedHashes.add(partionCounter);
 
-      filter.setIncludeFilter(includedHashes);
-      partitionImpl.setPartitionDetails(partitionColumn, filter);
-
       allPartitions.add(partitionImpl);
     }
   }
@@ -121,20 +115,9 @@ public class SampleDataPartitionerImpl implements DataPartitioner {
   /**
    * Identify the partitions applicable for the given filter
    */
-  public List<Partition> getPartitions(Map<String, CarbonDimensionLevelFilter> filters) {
-    if (filters == null || filters.size() == 0 || filters.get(partitionColumn) == null) {
-      return allPartitions;
-    }
-
-    CarbonDimensionLevelFilter msisdnFilter = filters.get(partitionColumn);
-    List<Partition> allowedPartitions =
-        new ArrayList<Partition>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+  public List<Partition> getPartitions() {
+    return allPartitions;
 
-    if (msisdnFilter.getIncludeFilter().isEmpty()) {
-      // Partition check can be done only for include filter list.
-      // If the filter is of other type,return all the partitions list
-      return allPartitions;
-    }
     // TODO: this has to be redone during partitioning implementation
     //    for (Partition aPartition : allPartitions) {
     //      CarbonDimensionLevelFilter partitionFilterDetails =
@@ -151,7 +134,6 @@ public class SampleDataPartitionerImpl implements DataPartitioner {
     //      }
     //    }
 
-    return allowedPartitions;
   }
 
   private int hashCode(long key) {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/29360501/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index 4c7c460..5359009 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -159,6 +159,7 @@ case class CarbonDictionaryDecoder(
               .createCache(CacheType.FORWARD_DICTIONARY, storePath)
           val dicts: Seq[Dictionary] = getDictionary(absoluteTableIdentifiers,
             forwardDictionaryCache)
+          val dictIndex = dicts.zipWithIndex.filter(x => x._1 != null).map(x => x._2)
           new Iterator[InternalRow] {
             val unsafeProjection = UnsafeProjection.create(output.map(_.dataType).toArray)
             override final def hasNext: Boolean = iter.hasNext
@@ -166,15 +167,15 @@ case class CarbonDictionaryDecoder(
             override final def next(): InternalRow = {
               val row: InternalRow = iter.next()
               val data = row.toSeq(dataTypes).toArray
-              for (i <- data.indices) {
-                if (dicts(i) != null && data(i) != null) {
-                  data(i) = toType(DataTypeUtil
-                    .getDataBasedOnDataType(dicts(i)
-                      .getDictionaryValueForKey(data(i).asInstanceOf[Integer]),
-                      getDictionaryColumnIds(i)._3))
+              dictIndex.foreach { index =>
+                if (data(index) != null) {
+                  data(index) = DataTypeUtil
+                    .getDataBasedOnDataType(dicts(index)
+                      .getDictionaryValueForKey(data(index).asInstanceOf[Int]),
+                      getDictionaryColumnIds(index)._3)
                 }
               }
-              unsafeProjection(new GenericMutableRow(data))
+              unsafeProjection(row)
             }
           }
         }
@@ -191,13 +192,6 @@ case class CarbonDictionaryDecoder(
     }
   }
 
-  private def toType(obj: Any): Any = {
-    obj match {
-      case s: String => UTF8String.fromString(s)
-      case _ => obj
-    }
-  }
-
   private def getDictionary(atiMap: Map[String, AbsoluteTableIdentifier],
       cache: Cache[DictionaryColumnUniqueIdentifier, Dictionary]) = {
     val dicts: Seq[Dictionary] = getDictionaryColumnIds.map { f =>

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/29360501/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala
index ba4c37e..22fa4fb 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala
@@ -187,12 +187,6 @@ case class CarbonScan(
   override def outputsUnsafeRows: Boolean = attributesNeedToDecode.size() == 0
 
   override def doExecute(): RDD[InternalRow] = {
-    def toType(obj: Any): Any = {
-      obj match {
-        case s: String => UTF8String.fromString(s)
-        case _ => obj
-      }
-    }
     val outUnsafeRows: Boolean = attributesNeedToDecode.size() == 0
     inputRdd.mapPartitions { iter =>
       val unsafeProjection = UnsafeProjection.create(output.map(_.dataType).toArray)
@@ -201,9 +195,9 @@ case class CarbonScan(
 
         override def next(): InternalRow =
           if (outUnsafeRows) {
-            unsafeProjection(new GenericMutableRow(iter.next().map(toType)))
+            unsafeProjection(new GenericMutableRow(iter.next()))
           } else {
-            new GenericMutableRow(iter.next().map(toType))
+            new GenericMutableRow(iter.next())
           }
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/29360501/processing/src/test/java/org/carbondata/core/keygenerator/directdictionary/timestamp/TimeStampDirectDictionaryGenerator_UT.java
----------------------------------------------------------------------
diff --git a/processing/src/test/java/org/carbondata/core/keygenerator/directdictionary/timestamp/TimeStampDirectDictionaryGenerator_UT.java b/processing/src/test/java/org/carbondata/core/keygenerator/directdictionary/timestamp/TimeStampDirectDictionaryGenerator_UT.java
index 6ac8b60..601bae4 100644
--- a/processing/src/test/java/org/carbondata/core/keygenerator/directdictionary/timestamp/TimeStampDirectDictionaryGenerator_UT.java
+++ b/processing/src/test/java/org/carbondata/core/keygenerator/directdictionary/timestamp/TimeStampDirectDictionaryGenerator_UT.java
@@ -36,7 +36,7 @@ public class TimeStampDirectDictionaryGenerator_UT {
   private int surrogateKey = -1;
 
   @Before public void setUp() throws Exception {
-    TimeStampDirectDictionaryGenerator generator = new TimeStampDirectDictionaryGenerator();
+    TimeStampDirectDictionaryGenerator generator = TimeStampDirectDictionaryGenerator.instance;
     surrogateKey = generator.generateDirectSurrogateKey("2015-10-20 12:30:01");
   }
 
@@ -46,7 +46,7 @@ public class TimeStampDirectDictionaryGenerator_UT {
    * @throws Exception
    */
   @Test public void generateDirectSurrogateKey() throws Exception {
-    TimeStampDirectDictionaryGenerator generator = new TimeStampDirectDictionaryGenerator();
+    TimeStampDirectDictionaryGenerator generator = TimeStampDirectDictionaryGenerator.instance;
     // default timestamp format is "yyyy-MM-dd HH:mm:ss" and the data being passed
     // in "dd/MM/yyyy" so the parsing should fail and method should return -1.
     int key = generator.generateDirectSurrogateKey("20/12/2014");
@@ -62,7 +62,7 @@ public class TimeStampDirectDictionaryGenerator_UT {
    * @throws Exception
    */
   @Test public void getValueFromSurrogate() throws Exception {
-    TimeStampDirectDictionaryGenerator generator = new TimeStampDirectDictionaryGenerator();
+    TimeStampDirectDictionaryGenerator generator = TimeStampDirectDictionaryGenerator.instance;
     long valueFromSurrogate = (long) generator.getValueFromSurrogate(surrogateKey);
     Date date = new Date(valueFromSurrogate / 1000);
     SimpleDateFormat timeParser = new SimpleDateFormat(CarbonProperties.getInstance()