You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/04/03 06:43:52 UTC

carbondata git commit: [CARBONDATA-2269]Support Query On PreAggregate table created on streaming table

Repository: carbondata
Updated Branches:
  refs/heads/master 32405f4f5 -> fb1516c00


[CARBONDATA-2269]Support Query On PreAggregate table created on streaming table

Support Query On Pre Aggregate table created on Streaming table
For querying the data on PreAggregate table on streaming table change the query plan to apply union of agg table and streaming segment of actual table to get the current data.
Query Example for streaming table:
User Query:
SELECT name, sum(Salary) as totalSalary
FROM maintable
Updated Query:
SELECT name, sum(totalSalary) FROM(
SELECT name, sum(Salary) as totalSalary
FROM maintable
GROUP BY name
UNION ALL
SELECT maintable_name,sum(maintable_salary) as totalSalary
FROM maintable_agg
GROUP BY maintable_name)
GROUP BY name)

User Query:
SELECT name, AVG(Salary) as avgSalary
FROM maintable.
Updated Query:
SELECT name, Divide(sum(sumSalary)/sum(countsalary))
FROM(
SELECT name, sum(Salary) as sumSalary,count(salary) countsalary
FROM maintable
GROUP BY name
UNION ALL
SELECT maintable_name,sum(maintable_salary) as sumSalary, count(maintable_salary) countsalary
FROM maintable_agg
GROUP BY maintable_name)
GROUP BY name)

User Query:
SELECT name, count(Salary) as countSalary
FROM maintable.
Updated Query:
SELECT name, sum(countsalary)
FROM(
SELECT name, count(Salary) as countSalary
FROM maintable
GROUP BY name
UNION ALL
SELECT maintable_name,sum(maintable_count)
FROM maintable_agg
GROUP BY maintable_name)
GROUP BY name)

This closes #2083


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/fb1516c0
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/fb1516c0
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/fb1516c0

Branch: refs/heads/master
Commit: fb1516c0008da73be86b31f82df0178caa748f5d
Parents: 32405f4
Author: kumarvishal <ku...@gmail.com>
Authored: Mon Mar 19 07:53:15 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Tue Apr 3 12:13:22 2018 +0530

----------------------------------------------------------------------
 .../CarbonCommonConstantsInternal.java          |  27 +
 .../carbondata/core/util/SessionParams.java     |   3 +
 .../hadoop/api/CarbonInputFormat.java           |  30 +
 .../hadoop/api/CarbonTableInputFormat.java      |  61 +-
 .../preaggregate/TestPreAggStreaming.scala      |  97 +++
 .../carbondata/spark/rdd/CarbonScanRDD.scala    |  26 +-
 .../sql/hive/CarbonPreAggregateRules.scala      | 719 +++++++++++++++----
 .../execution/command/CarbonHiveCommands.scala  |   5 +-
 8 files changed, 801 insertions(+), 167 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/fb1516c0/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstantsInternal.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstantsInternal.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstantsInternal.java
new file mode 100644
index 0000000..398e03a
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstantsInternal.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.constants;
+
+/**
+ * Below class will be used to keep all the internal property constants,
+ * which will be used internally. These property will not be exposed to users
+ */
+public interface CarbonCommonConstantsInternal {
+
+  String QUERY_ON_PRE_AGG_STREAMING = "carbon.query.on.preagg.streaming.";
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fb1516c0/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
index 68d0daa..20968a3 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
@@ -26,6 +26,7 @@ import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.cache.CacheProvider;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.constants.CarbonCommonConstantsInternal;
 import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
 import org.apache.carbondata.core.exception.InvalidConfigurationException;
 
@@ -203,6 +204,8 @@ public class SessionParams implements Serializable {
           isValid = true;
         } else if (key.equalsIgnoreCase(CarbonCommonConstants.SUPPORT_DIRECT_QUERY_ON_DATAMAP)) {
           isValid = true;
+        } else if (key.startsWith(CarbonCommonConstantsInternal.QUERY_ON_PRE_AGG_STREAMING)) {
+          isValid = true;
         } else {
           throw new InvalidConfigurationException(
               "The key " + key + " not supported for dynamic configuration.");

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fb1516c0/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
index c757ba9..f85e0e9 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
@@ -26,6 +26,7 @@ import java.util.BitSet;
 import java.util.List;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.constants.CarbonCommonConstantsInternal;
 import org.apache.carbondata.core.datamap.DataMapChooser;
 import org.apache.carbondata.core.datamap.DataMapLevel;
 import org.apache.carbondata.core.datamap.Segment;
@@ -237,6 +238,16 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
   }
 
   /**
+   * Set `CARBON_INPUT_SEGMENTS` from property to configuration
+   */
+  public static void setQuerySegment(Configuration conf, String segmentList) {
+    if (!segmentList.trim().equals("*")) {
+      CarbonInputFormat
+          .setSegmentsToAccess(conf, Segment.toSegmentList(segmentList.split(",")));
+    }
+  }
+
+  /**
    * set list of segment to access
    */
   public static void setValidateSegmentsToAccess(Configuration configuration, Boolean validate) {
@@ -545,4 +556,23 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
     }
     return tableName;
   }
+
+  public static void setAccessStreamingSegments(Configuration configuration, Boolean validate)
+      throws InvalidConfigurationException {
+    configuration.set(
+        CarbonCommonConstantsInternal.QUERY_ON_PRE_AGG_STREAMING + "." + getDatabaseName(
+            configuration) + "." + getTableName(configuration), validate.toString());
+  }
+
+  public static boolean getAccessStreamingSegments(Configuration configuration) {
+    try {
+      return configuration.get(
+          CarbonCommonConstantsInternal.QUERY_ON_PRE_AGG_STREAMING + "." + getDatabaseName(
+              configuration) + "." + getTableName(
+                  configuration), "false").equalsIgnoreCase("true");
+
+    } catch (InvalidConfigurationException e) {
+      return false;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fb1516c0/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 46afb36..b4a5c5e 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -143,20 +143,32 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
     SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier);
     SegmentStatusManager.ValidAndInvalidSegmentsInfo segments =
         segmentStatusManager.getValidAndInvalidSegments(loadMetadataDetails);
-
+    // to check whether only streaming segments access is enabled or not,
+    // if access streaming segment is true then data will be read from streaming segments
+    boolean accessStreamingSegments = getAccessStreamingSegments(job.getConfiguration());
     if (getValidateSegmentsToAccess(job.getConfiguration())) {
-      List<Segment> validSegments = segments.getValidSegments();
-      streamSegments = segments.getStreamSegments();
-      streamSegments = getFilteredSegment(job,streamSegments, true);
-      if (validSegments.size() == 0) {
-        return getSplitsOfStreaming(job, identifier, streamSegments);
-      }
-      List<Segment> filteredSegmentToAccess = getFilteredSegment(job, segments.getValidSegments(),
-          true);
-      if (filteredSegmentToAccess.size() == 0) {
-        return getSplitsOfStreaming(job, identifier, streamSegments);
+      if (!accessStreamingSegments) {
+        List<Segment> validSegments = segments.getValidSegments();
+        streamSegments = segments.getStreamSegments();
+        streamSegments = getFilteredSegment(job, streamSegments, true);
+        if (validSegments.size() == 0) {
+          return getSplitsOfStreaming(job, identifier, streamSegments);
+        }
+        List<Segment> filteredSegmentToAccess =
+            getFilteredSegment(job, segments.getValidSegments(), true);
+        if (filteredSegmentToAccess.size() == 0) {
+          return getSplitsOfStreaming(job, identifier, streamSegments);
+        } else {
+          setSegmentsToAccess(job.getConfiguration(), filteredSegmentToAccess);
+        }
       } else {
-        setSegmentsToAccess(job.getConfiguration(), filteredSegmentToAccess);
+        List<Segment> filteredNormalSegments =
+            getFilteredNormalSegments(job, segments.getValidSegments(), getSegmentsToAccess(job));
+        streamSegments = segments.getStreamSegments();
+        if (filteredNormalSegments.size() == 0) {
+          return getSplitsOfStreaming(job, identifier, streamSegments);
+        }
+        setSegmentsToAccess(job.getConfiguration(),filteredNormalSegments);
       }
       // remove entry in the segment index if there are invalid segments
       invalidSegments.addAll(segments.getInvalidSegments());
@@ -169,7 +181,7 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
             .clearInvalidSegments(getOrCreateCarbonTable(job.getConfiguration()), invalidSegments);
       }
     }
-    ArrayList<Segment> validAndInProgressSegments = new ArrayList<>(segments.getValidSegments());
+    List<Segment> validAndInProgressSegments = new ArrayList<>(segments.getValidSegments());
     // Add in progress segments also to filter it as in case of aggregate table load it loads
     // data from in progress table.
     validAndInProgressSegments.addAll(segments.getListOfInProgressSegments());
@@ -244,6 +256,29 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
   }
 
   /**
+   * Below method will be used to get the filter segments when query is fired on pre Aggregate
+   * and main table in case of streaming.
+   * For Pre Aggregate rules it will set all the valid segments for both streaming and
+   * and normal for fact table, so if any handoff happened in between it will
+   * select only new hand off segments segments for fact.
+   * @param job
+   * @param validSegments
+   * @param segmentsToAccess
+   * @return
+   */
+  private List<Segment> getFilteredNormalSegments(JobContext job, List<Segment> validSegments,
+      Segment[] segmentsToAccess) {
+    List<Segment> segmentToAccessSet = Arrays.asList(segmentsToAccess);
+    List<Segment> filteredSegment = new ArrayList<>();
+    for (Segment seg : validSegments) {
+      if (!segmentToAccessSet.contains(seg)) {
+        filteredSegment.add(seg);
+      }
+    }
+    return filteredSegment;
+  }
+
+  /**
    * Return segment list after filtering out valid segments and segments set by user by
    * `INPUT_SEGMENT_NUMBERS` in job configuration
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fb1516c0/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggStreaming.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggStreaming.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggStreaming.scala
new file mode 100644
index 0000000..0b644f5
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggStreaming.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.integration.spark.testsuite.preaggregate
+
+import org.apache.spark.sql.CarbonDatasourceHadoopRelation
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Union}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.{BeforeAndAfterAll, Ignore}
+
+
+class TestPreAggStreaming extends QueryTest with BeforeAndAfterAll {
+
+
+  override def beforeAll: Unit = {
+    sql("drop table if exists mainTable")
+    sql("CREATE TABLE mainTable(id int, name string, city string, age string) STORED BY 'org.apache.carbondata.format' tblproperties('streaming'='true')")
+    sql("create datamap agg0 on table mainTable using 'preaggregate' as select name from mainTable group by name")
+    sql("create datamap agg1 on table mainTable using 'preaggregate' as select name,sum(age) from mainTable group by name")
+    sql("create datamap agg2 on table mainTable using 'preaggregate' as select name,avg(age) from mainTable group by name")
+    sql("create datamap agg3 on table mainTable using 'preaggregate' as select name,sum(CASE WHEN age=35 THEN id ELSE 0 END) from mainTable group by name")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/measureinsertintotest.csv' into table mainTable")
+  }
+
+  test("Test Pre Agg Streaming with project column and group by") {
+    val df = sql("select name from maintable group by name")
+    df.collect()
+    assert(validateStreamingTablePlan(df.queryExecution.analyzed))
+  }
+
+  test("Test Pre Agg Streaming table Agg Sum Aggregation") {
+    val df = sql("select name, sum(age) from maintable group by name")
+    df.collect()
+    assert(validateStreamingTablePlan(df.queryExecution.analyzed))
+  }
+
+  test("Test Pre Agg Streaming table With Sum Aggregation And Order by") {
+    val df = sql("select name, sum(age) from maintable group by name order by name")
+    df.collect()
+    assert(validateStreamingTablePlan(df.queryExecution.analyzed))
+  }
+
+  test("Test Pre Agg Streaming table With Avg Aggregation") {
+    val df = sql("select name, avg(age) from maintable group by name order by name")
+    df.collect()
+    assert(validateStreamingTablePlan(df.queryExecution.analyzed))
+  }
+
+  test("Test Pre Agg Streaming table With Expression Aggregation") {
+    val df = sql("select name, sum(CASE WHEN age=35 THEN id ELSE 0 END) from maintable group by name order by name")
+    df.collect()
+    assert(validateStreamingTablePlan(df.queryExecution.analyzed))
+  }
+
+  /**
+   * Below method will be used validate whether plan is already updated in case of streaming table
+   * In case of streaming table it will add UnionNode to get the data from fact and aggregate both
+   * as aggregate table will be updated after each handoff.
+   * So if plan is already updated no need to transform the plan again
+   * @param logicalPlan
+   * query plan
+   * @return whether need to update the query plan or not
+   */
+  def validateStreamingTablePlan(logicalPlan: LogicalPlan) : Boolean = {
+    var isChildTableExists: Boolean = false
+    logicalPlan.transform {
+      case union @ Union(Seq(plan1, plan2)) =>
+        plan2.collect{
+          case logicalRelation: LogicalRelation if
+          logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
+          logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
+            .isChildDataMap =>
+            isChildTableExists = true
+        }
+        union
+    }
+    isChildTableExists
+  }
+
+  override def afterAll: Unit = {
+    sql("drop table if exists mainTable")
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fb1516c0/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 7e549a6..29acfff 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -40,7 +40,7 @@ import org.apache.spark.sql.profiler.{GetPartition, Profiler, QueryTaskEnd}
 import org.apache.spark.sql.util.SparkSQLUtil.sessionState
 
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonCommonConstantsInternal}
 import org.apache.carbondata.core.datastore.block.Distributable
 import org.apache.carbondata.core.indexstore.PartitionSpec
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
@@ -556,14 +556,30 @@ class CarbonScanRDD(
       CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT).toBoolean) {
       CarbonInputFormat.setDataMapJob(conf, new SparkDataMapJob)
     }
-
     // when validate segments is disabled in thread local update it to CarbonTableInputFormat
     val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
     if (carbonSessionInfo != null) {
+      val tableUniqueKey = identifier.getDatabaseName + "." + identifier.getTableName
+      val validateInputSegmentsKey = CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS +
+                                     tableUniqueKey
       CarbonInputFormat.setValidateSegmentsToAccess(conf, carbonSessionInfo.getSessionParams
-          .getProperty(CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS +
-                       identifier.getCarbonTableIdentifier.getDatabaseName + "." +
-                       identifier.getCarbonTableIdentifier.getTableName, "true").toBoolean)
+                                       .getProperty(validateInputSegmentsKey, "true").toBoolean)
+      val queryOnPreAggStreamingKey = CarbonCommonConstantsInternal.QUERY_ON_PRE_AGG_STREAMING +
+                                  tableUniqueKey
+      val queryOnPreAggStreaming = carbonSessionInfo.getThreadParams
+        .getProperty(queryOnPreAggStreamingKey, "false").toBoolean
+      CarbonInputFormat.setAccessStreamingSegments(conf, queryOnPreAggStreaming)
+      val inputSegmentsKey = CarbonCommonConstants.CARBON_INPUT_SEGMENTS + tableUniqueKey
+      if(queryOnPreAggStreaming) {
+        CarbonInputFormat.setValidateSegmentsToAccess(conf, carbonSessionInfo.getThreadParams
+          .getProperty(validateInputSegmentsKey, "true").toBoolean)
+        CarbonInputFormat
+          .setQuerySegment(conf,
+            carbonSessionInfo.getThreadParams.getProperty(inputSegmentsKey, "*"))
+        carbonSessionInfo.getThreadParams.removeProperty(queryOnPreAggStreamingKey)
+        carbonSessionInfo.getThreadParams.removeProperty(inputSegmentsKey)
+        carbonSessionInfo.getThreadParams.removeProperty(validateInputSegmentsKey)
+      }
     }
     format
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fb1516c0/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
index e8374e3..5c553b9 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
@@ -34,10 +34,11 @@ import org.apache.spark.sql.parser.CarbonSpark2SqlParser
 import org.apache.spark.sql.types._
 import org.apache.spark.util.CarbonReflectionUtils
 
-import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonCommonConstantsInternal}
 import org.apache.carbondata.core.metadata.schema.table.{AggregationDataMapSchema, CarbonTable, DataMapSchema}
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
 import org.apache.carbondata.core.preagg.{AggregateQueryPlan, AggregateTableSelector, QueryColumn}
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.core.util.{CarbonUtil, ThreadLocalSessionInfo}
 
 /**
@@ -89,6 +90,50 @@ case class AggExpToColumnMappingModel(
  * 6. timeseries function
  *    6.1 validate maintable has timeseries datamap
  *    6.2 timeseries function is valid function or not
+ * 7. Streaming
+ * Examples1:
+ * Query:
+ *   SELECT name, sum(Salary) as totalSalary
+ *   FROM maintable.
+ * UpdatedQuery:
+ *   SELECT name, sum(totalSalary) FROM(
+ *          SELECT name, sum(Salary) as totalSalary
+ *          FROM maintable
+ *          GROUP BY name
+ *          UNION ALL
+ *          SELECT maintable_name,sum(maintable_salary) as totalSalary
+ *          FROM maintable_agg
+ *          GROUP BY maintable_name)
+ *   GROUP BY name)
+ * Example2:
+ * Query:
+ *   SELECT name, AVG(Salary) as avgSalary
+ *        FROM maintable.
+ * UpdatedQuery:
+ *   SELECT name, Divide(sum(sumSalary)/sum(countsalary))
+ *   FROM(
+ *    SELECT name, sum(Salary) as sumSalary,count(salary) countsalary
+ *      FROM maintable
+ *      GROUP BY name
+ *    UNION ALL
+ *    SELECT maintable_name,sum(maintable_salary) as sumSalary, count(maintable_salary) countsalary
+ *      FROM maintable_agg
+ *      GROUP BY maintable_name)
+ *   GROUP BY name)
+ *
+ * Rules for updating plan in case of streaming table:
+ * In case of streaming data will be fetched from both fact and aggregate as aggregate table
+ * will be updated only after each hand-off, so current streamed data won't be available on
+ * aggregate table.
+ * 7.1 Add one union node to add both fact and aggregate table plan to get the data from both table
+ * 7.2 On top of Union Node add one Aggregate node to aggregate both table results
+ * 7.3 In case of average(avg(column)) special handling is required for streaming
+ *     7.3.1 Fact Plan will updated to return sum(column) and count(column) to do rollup
+ *     7.3.2 Aggregate Plan will updated to return sum(column) and count(column) to do rollup
+ * 7.4 In newly added Aggregate node all the aggregate expression must have same expression id as
+ *     fact and fact plan will updated with new expression id. As query like order by this can be
+ *     referred. In example1 sum(totalSalary) as totalSalary will have same expression id
+ *     as in fact and fact plan sum(salary) will be updated with new expression id
  *
  * @param sparkSession spark session
  */
@@ -142,6 +187,9 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
     }
     if(needAnalysis) {
       needAnalysis = isValidPlan(plan)
+      if(needAnalysis) {
+        needAnalysis = validateStreamingTablePlan(plan)
+      }
     }
     // if plan is not valid for transformation then return same plan
     if (!needAnalysis) {
@@ -151,6 +199,32 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
       val newPlan = updatePlan(updatedPlan)
       newPlan
     }
+
+  }
+
+  /**
+   * Below method will be used validate whether plan is already updated in case of streaming table
+   * In case of streaming table it will add UnionNode to get the data from fact and aggregate both
+   * as aggregate table will be updated after each handoff.
+   * So if plan is already updated no need to transform the plan again
+   * @param logicalPlan
+   * query plan
+   * @return whether need to update the query plan or not
+   */
+  def validateStreamingTablePlan(logicalPlan: LogicalPlan) : Boolean = {
+    var needTransformation: Boolean = true
+    logicalPlan.transform {
+      case union @ Union(Seq(plan1, plan2)) =>
+        plan2.collect{
+          case logicalRelation: LogicalRelation if
+          logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
+          logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
+            .isChildDataMap =>
+            needTransformation = false
+        }
+        union
+    }
+    needTransformation
   }
 
   /**
@@ -191,7 +265,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
    * @return updated sort expression
    */
   def updateSortExpression(sortExp : Seq[SortOrder]) : Seq[SortOrder] = {
-     sortExp map { order =>
+    sortExp map { order =>
       SortOrder(order.child transform  {
         case attr: AttributeReference =>
           updatedExpression.find { p => p._1.sameRef(attr) } match {
@@ -254,6 +328,21 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
    * 5. timeseries function
    *    5.1 validate parent table has timeseries datamap
    *    5.2 timeseries function is valid function or not
+   * 6. Streaming
+   * Rules for updating plan in case of streaming table:
+   * In case of streaming data will be fetched from both fact and aggregate as aggregate table
+   * will be updated only after each hand-off, so current streamed data won't be available on
+   * aggregate table.
+   * 6.1 Add one union node to add both fact and aggregate table plan to
+   *     get the data from both table
+   * 6.2 On top of Union Node add one Aggregate node to aggregate both table results
+   * 6.3 In case of average(avg(column)) special handling is required for streaming
+   *     7.3.1 Fact Plan will updated to return sum(column) and count(column) to do rollup
+   *     7.3.2 Aggregate Plan will updated to return sum(column) and count(column) to do rollup
+   * 6.4 In newly added Aggregate node all the aggregate expression must have same expression id as
+   *     fact and fact plan will updated with new expression id. As query like order by this can be
+   *     referred. In example1 sum(totalSalary) as totalSalary will have same expression id
+   *     as in fact and fact plan sum(salary) will be updated with new expression id
    *
    * @param logicalPlan parent logical plan
    * @return transformed plan
@@ -267,7 +356,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
         CarbonSubqueryAlias(alias1, child@CarbonSubqueryAlias(alias2, l: LogicalRelation)))
         if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
            l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
-             metaData.hasAggregateDataMapSchema =>
+             metaData.hasAggregateDataMapSchema && !isPlanUpdated =>
         val carbonTable = getCarbonTable(l)
         if(isSpecificSegmentNotPresent(carbonTable)) {
           val list = scala.collection.mutable.HashSet.empty[QueryColumn]
@@ -296,14 +385,26 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
                   carbonTable,
                   agg)
               isPlanUpdated = true
-              Aggregate(updatedGroupExp,
+              val updateAggPlan =
+                Aggregate(
+                updatedGroupExp,
                 updatedAggExp,
-                CarbonReflectionUtils
-                  .getSubqueryAlias(sparkSession,
-                    Some(alias1),
-                    CarbonReflectionUtils
-                      .getSubqueryAlias(sparkSession, Some(alias2), newChild, None),
-                    None))
+                CarbonReflectionUtils.getSubqueryAlias(
+                  sparkSession,
+                  Some(alias1),
+                  CarbonReflectionUtils.getSubqueryAlias(
+                    sparkSession,
+                    Some(alias2),
+                    newChild,
+                    None),
+                  None))
+              getAggregateQueryPlan(
+                updateAggPlan,
+                grExp,
+                aggExp,
+                carbonTable,
+                aggDataMapSchema,
+                agg)
             } else {
               agg
             }
@@ -315,12 +416,12 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
         }
       // case for aggregation query
       case agg@Aggregate(
-        grExp,
-        aggExp,
-        child@CarbonSubqueryAlias(alias, l: LogicalRelation))
+      grExp,
+      aggExp,
+      child@CarbonSubqueryAlias(alias, l: LogicalRelation))
         if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
            l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
-             metaData.hasAggregateDataMapSchema =>
+             metaData.hasAggregateDataMapSchema && !isPlanUpdated =>
         val carbonTable = getCarbonTable(l)
         if(isSpecificSegmentNotPresent(carbonTable)) {
           val list = scala.collection.mutable.HashSet.empty[QueryColumn]
@@ -349,10 +450,22 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
                   carbonTable,
                   agg)
               isPlanUpdated = true
-              Aggregate(updatedGroupExp,
+              val updateAggPlan =
+                Aggregate(
+                updatedGroupExp,
                 updatedAggExp,
-                CarbonReflectionUtils
-                  .getSubqueryAlias(sparkSession, Some(alias), newChild, None))
+                CarbonReflectionUtils.getSubqueryAlias(
+                  sparkSession,
+                  Some(alias),
+                  newChild,
+                  None))
+              getAggregateQueryPlan(
+                updateAggPlan,
+                grExp,
+                aggExp,
+                carbonTable,
+                aggDataMapSchema,
+                agg)
             } else {
               agg
             }
@@ -364,12 +477,12 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
         }
       // case of handling aggregation query with filter
       case agg@Aggregate(
-        grExp,
-        aggExp,
-        Filter(expression, child@CarbonSubqueryAlias(alias, l: LogicalRelation)))
+      grExp,
+      aggExp,
+      Filter(expression, child@CarbonSubqueryAlias(alias, l: LogicalRelation)))
         if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
            l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
-             metaData.hasAggregateDataMapSchema =>
+             metaData.hasAggregateDataMapSchema && !isPlanUpdated =>
         val carbonTable = getCarbonTable(l)
         if(isSpecificSegmentNotPresent(carbonTable)) {
           val list = scala.collection.mutable.HashSet.empty[QueryColumn]
@@ -405,11 +518,24 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
                   carbonTable,
                   agg)
               isPlanUpdated = true
-              Aggregate(updatedGroupExp,
+              val updateAggPlan =
+                Aggregate(
+                updatedGroupExp,
                 updatedAggExp,
-                Filter(updatedFilterExpression.get,
-                  CarbonReflectionUtils
-                    .getSubqueryAlias(sparkSession, Some(alias), newChild, None)))
+                Filter(
+                  updatedFilterExpression.get,
+                  CarbonReflectionUtils.getSubqueryAlias(
+                    sparkSession,
+                    Some(alias),
+                    newChild,
+                    None)))
+              getAggregateQueryPlan(
+                updateAggPlan,
+                grExp,
+                aggExp,
+                carbonTable,
+                aggDataMapSchema,
+                agg)
             } else {
               agg
             }
@@ -420,14 +546,14 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
           agg
         }
       case agg@Aggregate(
-        grExp,
-        aggExp,
-        Filter(
-          expression,
-          CarbonSubqueryAlias(alias1, child@CarbonSubqueryAlias(alias2, l: LogicalRelation))))
+      grExp,
+      aggExp,
+      Filter(
+      expression,
+      CarbonSubqueryAlias(alias1, child@CarbonSubqueryAlias(alias2, l: LogicalRelation))))
         if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
            l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
-             metaData.hasAggregateDataMapSchema =>
+             metaData.hasAggregateDataMapSchema && !isPlanUpdated =>
         val carbonTable = getCarbonTable(l)
         if(isSpecificSegmentNotPresent(carbonTable)) {
           val list = scala.collection.mutable.HashSet.empty[QueryColumn]
@@ -438,16 +564,13 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
             carbonTable,
             list,
             aggregateExpressions)
-
           if (isValidPlan) {
             isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(expression)
           }
-
           // getting the columns from filter expression
           if (isValidPlan) {
             extractColumnFromExpression(expression, list, carbonTable, true)
           }
-
           if (isValidPlan) {
             val (aggDataMapSchema, childPlan) = getChildDataMapForTransformation(list,
               aggregateExpressions,
@@ -466,15 +589,28 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
                   carbonTable,
                   agg)
               isPlanUpdated = true
-              Aggregate(updatedGroupExp,
-                updatedAggExp,
-                Filter(updatedFilterExpression.get,
-                  CarbonReflectionUtils
-                    .getSubqueryAlias(sparkSession,
+              val updateAggPlan =
+                Aggregate(
+                  updatedGroupExp,
+                  updatedAggExp,
+                  Filter(
+                    updatedFilterExpression.get,
+                    CarbonReflectionUtils.getSubqueryAlias(
+                      sparkSession,
                       Some(alias1),
-                      CarbonReflectionUtils
-                        .getSubqueryAlias(sparkSession, Some(alias2), newChild, None),
+                      CarbonReflectionUtils.getSubqueryAlias(
+                        sparkSession,
+                        Some(alias2),
+                        newChild,
+                        None),
                       None)))
+              getAggregateQueryPlan(
+                updateAggPlan,
+                grExp,
+                aggExp,
+                carbonTable,
+                aggDataMapSchema,
+                agg)
             } else {
               agg
             }
@@ -494,6 +630,215 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
   }
 
   /**
+   * Method to get the aggregate query plan
+   * @param aggPlan
+   * aggregate table query plan
+   * @param grExp
+   * fact group by expression
+   * @param aggExp
+   * fact aggregate expression
+   * @param carbonTable
+   * fact table
+   * @param aggregationDataMapSchema
+   * selected aggregation data map
+   * @param factAggPlan
+   * fact aggregate query plan
+   * @return updated plan
+   */
+  def getAggregateQueryPlan(aggPlan: LogicalPlan,
+      grExp: Seq[Expression],
+      aggExp: Seq[NamedExpression],
+      carbonTable: CarbonTable,
+      aggregationDataMapSchema: DataMapSchema,
+      factAggPlan: LogicalPlan): LogicalPlan = {
+    // to handle streaming table with pre aggregate
+    if (carbonTable.isStreamingTable) {
+      setSegmentsForStreaming(carbonTable, aggregationDataMapSchema)
+      // get new fact expression
+      val factExp = updateFactTablePlanForStreaming(factAggPlan)
+      // get new Aggregate node expression
+      val streamingNodeExp = getExpressionsForStreaming(aggExp)
+      // clear the expression as in case of streaming it is not required
+      updatedExpression.clear
+      // Add Aggregate node to aggregate data from fact and aggregate
+      Aggregate(
+        grExp,
+        streamingNodeExp.asInstanceOf[Seq[NamedExpression]],
+        // add union node to get the result from both
+        Union(
+          factExp,
+          aggPlan))
+    } else {
+      aggPlan
+    }
+  }
+
+  /**
+   * Method to set the segments when query is fired on streaming table with pre aggregate
+   * Adding a property streaming_seg so while removing from session params we can differentiate
+   * it was set from CarbonPreAggregateRules
+   * @param parentTable
+   * parent arbon table
+   * @param dataMapSchema
+   * child datamap schema
+   */
+  def setSegmentsForStreaming(parentTable: CarbonTable, dataMapSchema: DataMapSchema): Unit = {
+    val mainTableKey = parentTable.getDatabaseName + '.' + parentTable.getTableName
+    val factManager = new SegmentStatusManager(parentTable.getAbsoluteTableIdentifier)
+    CarbonSession
+      .threadSet(CarbonCommonConstantsInternal.QUERY_ON_PRE_AGG_STREAMING + mainTableKey, "true")
+    CarbonSession
+      .threadSet(
+        CarbonCommonConstants.CARBON_INPUT_SEGMENTS + mainTableKey,
+        factManager.getValidAndInvalidSegments.getValidSegments.asScala.mkString(","))
+    CarbonSession
+      .threadSet(CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS + mainTableKey, "true")
+    // below code is for aggregate table
+    val identifier = TableIdentifier(
+      dataMapSchema.getChildSchema.getTableName,
+      Some(parentTable.getDatabaseName))
+    val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
+    val carbonRelation =
+      catalog.lookupRelation(identifier)(sparkSession).asInstanceOf[CarbonRelation]
+    val segmentStatusManager = new SegmentStatusManager(carbonRelation.carbonTable
+      .getAbsoluteTableIdentifier)
+    val validSegments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala
+      .mkString(",")
+    val childTableKey = carbonRelation.carbonTable.getDatabaseName + '.' +
+                   carbonRelation.carbonTable.getTableName
+    CarbonSession
+      .threadSet(CarbonCommonConstantsInternal.QUERY_ON_PRE_AGG_STREAMING + childTableKey, "true")
+    CarbonSession
+      .threadSet(CarbonCommonConstants.CARBON_INPUT_SEGMENTS + childTableKey, validSegments)
+    CarbonSession
+      .threadSet(CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS + childTableKey, "false")
+  }
+
+  /**
+   * Map to keep expression name to its alias mapping. This will be used while adding a node when
+   * plan for streaming table is updated.
+   * Note: In case of average fact table plan will have two alias as sum(column) and count(column)
+   * to support rollup
+   */
+  private val factPlanExpForStreaming = mutable.HashMap[String, Seq[NamedExpression]]()
+
+  /**
+   * Below method will be used to get the expression for Aggregate node added for streaming
+   * Expression id will be same as fact plan as it can be referred in query
+   *
+   * @param aggExp
+   * main table aggregate expression
+   * @return updated aggregate expression
+   */
+  private def getExpressionsForStreaming(aggExp: Seq[Expression]): Seq[Expression] = {
+    val updatedExp = aggExp map {
+      case attr: AttributeReference =>
+        attr
+      case alias@Alias(aggExp: AggregateExpression, name) =>
+        // in case of aggregate expression get the fact alias based on expression name
+        val factAlias = factPlanExpForStreaming(name)
+        // create attribute reference object for each expression
+        val attrs = factAlias.map { factAlias =>
+          AttributeReference(
+            name,
+            alias.dataType,
+            alias.nullable) (factAlias.exprId, alias.qualifier, alias.isGenerated)
+        }
+        // add aggregate function in Aggregate node added for handling streaming
+        // to aggregate results from fact and aggregate table
+        val updatedAggExp = getAggregateExpressionForAggregation(aggExp, attrs)
+        // same reference id will be used as it can be used by above nodes in the plan like
+        // sort, project, join
+        Alias(
+          updatedAggExp.head,
+          name)(alias.exprId, alias.qualifier, Option(alias.metadata), alias.isGenerated)
+      case alias@Alias(expression, name) =>
+        AttributeReference(
+          name,
+          alias.dataType,
+          alias.nullable) (alias.exprId, alias.qualifier, alias.isGenerated)
+    }
+    updatedExp
+  }
+
+  /**
+   * Below method will be used to update the fact plan in case of streaming table
+   * This is required to handle average aggregte function as in case of average we need to return
+   * two columns data sum(column) and count(column) to get the correct result
+   *
+   * @param logicalPlan
+   * fact table Aggregate plan
+   * @return updated aggregate plan for fact
+   */
+  private def updateFactTablePlanForStreaming(logicalPlan: LogicalPlan) : LogicalPlan = {
+    // only aggregate expression needs to be updated
+    logicalPlan.transform{
+      case agg@Aggregate(_, aggExp, _) =>
+        agg
+          .copy(aggregateExpressions = updateAggExpInFactForStreaming(aggExp)
+            .asInstanceOf[Seq[NamedExpression]])
+    }
+  }
+
+  /**
+   * Below method will be used to update the aggregate expression for streaming fact table plan
+   * @param namedExp
+   * streaming Fact plan aggregate expression
+   * @return
+   * Updated streaming fact plan aggregate expression
+   */
+  private def updateAggExpInFactForStreaming(namedExp : Seq[NamedExpression]) : Seq[Expression] = {
+    val updatedExp = namedExp.flatMap {
+      case attr: AttributeReference =>
+        Seq(attr)
+      case alias@Alias(aggExp: AggregateExpression, name) =>
+        // get the new aggregate expression
+        val newAggExp = getAggFunctionForFactStreaming(aggExp)
+        val updatedExp = newAggExp.map { exp =>
+          Alias(exp,
+            name)(
+            NamedExpression.newExprId,
+            alias.qualifier,
+            Some(alias.metadata),
+            alias.isGenerated)
+        }
+        // adding to map which will be used while Adding an Aggregate node for handling streaming
+        // table plan change
+        factPlanExpForStreaming.put(name, updatedExp)
+        updatedExp
+      case Alias(exp: Expression, _) =>
+        Seq(exp)
+    }
+    updatedExp
+  }
+  /**
+   * Below method will be used to update the fact table query aggregate function expression
+   * Rules for updating the expression.
+   * In case of average return sum(expression), count(expression) to get the correct result
+   * @param aggExp
+   * actual query aggregate expression
+   * @return seq of expression as in case of average we need to return two sum and count
+   *
+   */
+  def getAggFunctionForFactStreaming(aggExp: AggregateExpression): Seq[Expression] = {
+    aggExp.aggregateFunction match {
+      case Average(MatchCastExpression(exp: Expression, changeDataType: DataType)) =>
+        val newExp = Seq(AggregateExpression(Sum(Cast(exp, changeDataType)),
+          aggExp.mode,
+          isDistinct = false),
+          Cast(AggregateExpression(Count(exp), aggExp.mode, false), DoubleType))
+        newExp
+      case Average(exp: Expression) =>
+        val newExp = Seq(AggregateExpression(Sum(exp), aggExp.mode, false),
+          Cast(AggregateExpression(Count(exp), aggExp.mode, false), DoubleType))
+        newExp
+      case _ =>
+        val newExp = Seq(aggExp)
+        newExp
+    }
+  }
+
+  /**
    * Below method will be used to validate query plan and get the proper aggregation data map schema
    * and child relation plan object if plan is valid for transformation
    * @param queryColumns list of query columns from projection and filter
@@ -839,7 +1184,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
     // with child attribute reference. Please check class level documentation how when aggregate
     // function will be updated
 
-    val updatedAggExp = aggregateExpressions.map {
+    val updatedAggExp = aggregateExpressions.flatMap {
       // case for attribute reference
       case attr: AttributeReference =>
         val childAttr = getChildAttributeReference(aggDataMapSchema,
@@ -852,9 +1197,9 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
           childAttr.metadata)(newExpressionId, childAttr.qualifier, attr.isGenerated)
         updatedExpression.put(attr, childTableAttr)
         // returning the alias to show proper column name in output
-        Alias(childAttr,
+        Seq(Alias(childAttr,
           attr.name)(newExpressionId,
-          childAttr.qualifier).asInstanceOf[NamedExpression]
+          childAttr.qualifier).asInstanceOf[NamedExpression])
       // case for alias
       case alias@Alias(attr: AttributeReference, name) =>
         val childAttr = getChildAttributeReference(aggDataMapSchema,
@@ -869,9 +1214,9 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
           alias.nullable) (newExpressionId, alias.qualifier, alias.isGenerated)
         updatedExpression.put(parentTableAttr, childTableAttr)
         // returning alias with child attribute reference
-        Alias(childAttr,
+        Seq(Alias(childAttr,
           name)(newExpressionId,
-          childAttr.qualifier).asInstanceOf[NamedExpression]
+          childAttr.qualifier).asInstanceOf[NamedExpression])
       // for aggregate function case
       case alias@Alias(attr: AggregateExpression, name) =>
         // get the updated aggregate aggregate function
@@ -881,22 +1226,43 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
             attributes,
             parentTable,
             parentLogicalPlan,
-            aggExpColumnMapping.get)
+            aggExpColumnMapping.get,
+            parentTable.isStreamingTable)
         } else {
-          attr
+          Seq(attr)
+        }
+        if(!parentTable.isStreamingTable) {
+          // for normal table
+          // generate new expression id for child
+          val newExpressionId = NamedExpression.newExprId
+          // create a parent attribute reference which will be replced on node which may be referred
+          // by node like sort join
+          val parentTableAttr = AttributeReference(name,
+            alias.dataType,
+            alias.nullable)(alias.exprId, alias.qualifier, alias.isGenerated)
+          // creating a child attribute reference which will be replced
+          val childTableAttr = AttributeReference(name,
+            alias.dataType,
+            alias.nullable)(newExpressionId, alias.qualifier, alias.isGenerated)
+          // adding to map, will be used during other node updation like sort, join, project
+          updatedExpression.put(parentTableAttr, childTableAttr)
+          // returning alias with child attribute reference
+          Seq(Alias(aggExp.head,
+            name)(newExpressionId,
+            alias.qualifier).asInstanceOf[NamedExpression])
+        } else {
+          // for streaming table
+          // create alias for aggregate table
+          val aggExpForStreaming = aggExp.map{ exp =>
+            Alias(exp,
+              name)(
+              NamedExpression.newExprId,
+              alias.qualifier,
+              Some(alias.metadata),
+              alias.isGenerated).asInstanceOf[NamedExpression]
+          }
+          aggExpForStreaming
         }
-        val newExpressionId = NamedExpression.newExprId
-        val parentTableAttr = AttributeReference(name,
-          alias.dataType,
-          alias.nullable) (alias.exprId, alias.qualifier, alias.isGenerated)
-        val childTableAttr = AttributeReference(name,
-          alias.dataType,
-          alias.nullable) (newExpressionId, alias.qualifier, alias.isGenerated)
-        updatedExpression.put(parentTableAttr, childTableAttr)
-        // returning alias with child attribute reference
-        Alias(aggExp,
-          name)(newExpressionId,
-          alias.qualifier).asInstanceOf[NamedExpression]
       case alias@Alias(expression: Expression, name) =>
         val updatedExp =
           if (expression.isInstanceOf[ScalaUDF] &&
@@ -929,8 +1295,8 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
           alias.dataType,
           alias.nullable) (newExpressionId, alias.qualifier, alias.isGenerated)
         updatedExpression.put(parentTableAttr, childTableAttr)
-        Alias(updatedExp, name)(newExpressionId,
-          alias.qualifier).asInstanceOf[NamedExpression]
+        Seq(Alias(updatedExp, name)(newExpressionId,
+          alias.qualifier).asInstanceOf[NamedExpression])
     }
     // transforming the logical relation
     val newChild = child.transform {
@@ -980,8 +1346,9 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
       attributes: Seq[AttributeReference],
       parentTable: CarbonTable,
       parentLogicalPlan: LogicalPlan,
-      aggExpColumnMapping: mutable.LinkedHashSet[AggExpToColumnMappingModel]):
-  Expression = {
+      aggExpColumnMapping: mutable.LinkedHashSet[AggExpToColumnMappingModel],
+      isStreamingTable: Boolean):
+  Seq[Expression] = {
     // get the updated aggregate expression, in case of average column
     // it will be divided in two aggergate expression
     val updatedAggExp = PreAggregateUtil.validateAggregateFunctionAndGetFields(aggExp)
@@ -995,53 +1362,111 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
         attributes filter (_.name.equalsIgnoreCase(
           schemaAggExpModel.columnSchema.get.asInstanceOf[ColumnSchema].getColumnName))
     }.flatten
+    // getting aggregate table aggregate expressions
+    getAggregateExpressionForAggregation(aggExp, attrs.toSeq, isStreamingTable)
+  }
 
+  /**
+   * Below method will be used to update the aggregate expression.
+   * 1.In case of average below expression will be returned.
+   * 1.1 Streaming table
+   *    1.1.1 Aggregate table
+   *        It will return sum(expression) and count(expression)
+   *    1.2.1 Aggregate node added for streaming
+   *        It will return Divide(sum(expression)/count(expression))
+   * 2.1 Normal table
+   *    2.1.1 Aggregate table
+   *      It will return Divide(sum(expression)/count(expression))
+   * 2. In case of count for aggregate table and aggregate node added for streaming
+   *    table count will be aggregated to sum
+   *
+   * @param aggExp
+   * aggregate expression
+   * @param attrs
+   * aggregate function Attribute, in case of average it will be two to support rollup
+   * @return
+   * aggregate expression
+   */
+  def getAggregateExpressionForAggregation(aggExp: AggregateExpression,
+      attrs: Seq[AttributeReference],
+      isStreamingTable: Boolean = false): Seq[Expression] = {
     aggExp.aggregateFunction match {
       case Sum(MatchCastExpression(_, changeDataType: DataType)) =>
-        AggregateExpression(Sum(Cast(attrs.head, changeDataType)), aggExp.mode, isDistinct = false)
+        Seq(AggregateExpression(Sum(Cast(attrs.head, changeDataType)), aggExp.mode, false))
       case Sum(_) =>
-        AggregateExpression(Sum(attrs.head), aggExp.mode, isDistinct = false)
+        Seq(AggregateExpression(Sum(attrs.head), aggExp.mode, false))
       case Max(MatchCastExpression(_, changeDataType: DataType)) =>
-        AggregateExpression(Max(Cast(attrs.head, changeDataType)), aggExp.mode, isDistinct = false)
+        Seq(AggregateExpression(Max(Cast(attrs.head, changeDataType)), aggExp.mode, false))
       case Max(_) =>
-        AggregateExpression(Max(attrs.head), aggExp.mode, isDistinct = false)
+        Seq(AggregateExpression(Max(attrs.head), aggExp.mode, false))
       case Min(MatchCastExpression(_, changeDataType: DataType)) =>
-        AggregateExpression(Min(Cast(attrs.head, changeDataType)), aggExp.mode, isDistinct = false)
+        Seq(AggregateExpression(Min(Cast(attrs.head, changeDataType)), aggExp.mode, false))
       case Min(_) =>
-        AggregateExpression(Min(attrs.head), aggExp.mode, isDistinct = false)
+        Seq(AggregateExpression(Min(attrs.head), aggExp.mode, false))
       // Change the count AggregateExpression to Sum as count
       // is already calculated so in case of aggregate table
       // we need to apply sum to get the count
       case Count(Seq(expression: Expression)) =>
-        AggregateExpression(Sum(Cast(attrs.head, LongType)), aggExp.mode, isDistinct = false)
-      // In case of average aggregate function select 2 columns from aggregate table
-      // with aggregation sum and count.
-      // Then add divide(sum(column with sum), sum(column with count)).
+        Seq(AggregateExpression(Sum(Cast(attrs.head, LongType)), aggExp.mode, false))
+
       case Average(MatchCastExpression(exp: Expression, changeDataType: DataType)) =>
-        Divide(AggregateExpression(Sum(Cast(
-          attrs.head,
-          DoubleType)),
-          aggExp.mode,
-          isDistinct = false),
-          AggregateExpression(Sum(Cast(
-            attrs.last,
+        // for handling Normal table case/Aggregate node added in case of streaming table
+        if (!isStreamingTable) {
+          // In case of average aggregate function select 2 columns from aggregate table
+          // with aggregation sum and count.
+          // Then add divide(sum(column with sum), sum(column with count)).
+          Seq(Divide(AggregateExpression(Sum(Cast(
+            attrs.head,
             DoubleType)),
             aggExp.mode,
-            isDistinct = false))
-      // In case of average aggregate function select 2 columns from aggregate table
-      // with aggregation sum and count.
-      // Then add divide(sum(column with sum), sum(column with count)).
+            false),
+            AggregateExpression(Sum(Cast(
+              attrs.last,
+              DoubleType)),
+              aggExp.mode,
+              false)))
+        } else {
+          // in case of streaming aggregate table return two aggregate function sum and count
+          Seq(AggregateExpression(Sum(Cast(
+            attrs.head,
+            DoubleType)),
+            aggExp.mode,
+            false),
+            AggregateExpression(Sum(Cast(
+              attrs.last,
+              DoubleType)),
+              aggExp.mode,
+              false))
+        }
       case Average(exp: Expression) =>
-        Divide(AggregateExpression(Sum(Cast(
-          attrs.head,
-          DoubleType)),
-          aggExp.mode,
-          isDistinct = false),
-          AggregateExpression(Sum(Cast(
-            attrs.last,
+        // for handling Normal table case/Aggregate node added in case of streaming table
+        if (!isStreamingTable) {
+          // In case of average aggregate function select 2 columns from aggregate table
+          // with aggregation sum and count.
+          // Then add divide(sum(column with sum), sum(column with count)).
+          Seq(Divide(AggregateExpression(Sum(Cast(
+            attrs.head,
             DoubleType)),
             aggExp.mode,
-            isDistinct = false))
+            false),
+            AggregateExpression(Sum(Cast(
+              attrs.last,
+              DoubleType)),
+              aggExp.mode,
+              false)))
+        } else {
+          // in case of streaming aggregate table return two aggregate function sum and count
+          Seq(AggregateExpression(Sum(Cast(
+            attrs.head,
+            DoubleType)),
+            aggExp.mode,
+            false),
+            AggregateExpression(Sum(Cast(
+              attrs.last,
+              DoubleType)),
+              aggExp.mode,
+              false))
+        }
     }
   }
   /**
@@ -1143,8 +1568,8 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
     } else {
         new QueryColumn(
           columnSchema.getColumnSchema,
-          isFilterColumn,
-          timeseriesFunction.toLowerCase)
+        isFilterColumn,
+        timeseriesFunction.toLowerCase)
     }
   }
 }
@@ -1167,59 +1592,59 @@ case class CarbonPreAggregateDataLoadingRules(sparkSession: SparkSession)
       aExp,
       CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
         if validateAggregateExpressions(aExp) &&
-        logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
+           logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
         aExp.foreach {
           case attr: AttributeReference =>
-              namedExpressionList += attr
-            case alias@Alias(_: AttributeReference, _) =>
-              namedExpressionList += alias
-            case alias@Alias(aggExp: AggregateExpression, name) =>
-              // get the updated expression for avg convert it to two expression
-              // sum and count
-              val expressions = PreAggregateUtil.validateAggregateFunctionAndGetFields(aggExp)
-              // if size is more than one then it was for average
-              if(expressions.size > 1) {
-                val sumExp = PreAggregateUtil.normalizeExprId(
-                  expressions.head,
-                  aggregate.allAttributes)
-                // get the logical plan fro count expression
-                val countExp = PreAggregateUtil.normalizeExprId(
-                  expressions.last,
-                  aggregate.allAttributes)
-                // check with same expression already sum is present then do not add to
-                // named expression list otherwise update the list and add it to set
-                if (!validExpressionsMap.contains(AggExpToColumnMappingModel(sumExp))) {
-                  namedExpressionList +=
-                  Alias(expressions.head, name + "_ sum")(NamedExpression.newExprId,
-                    alias.qualifier,
-                    Some(alias.metadata),
-                    alias.isGenerated)
-                  validExpressionsMap += AggExpToColumnMappingModel(sumExp)
-                }
-                // check with same expression already count is present then do not add to
-                // named expression list otherwise update the list and add it to set
-                if (!validExpressionsMap.contains(AggExpToColumnMappingModel(countExp))) {
-                  namedExpressionList +=
-                  Alias(expressions.last, name + "_ count")(NamedExpression.newExprId,
-                    alias.qualifier,
-                    Some(alias.metadata),
-                    alias.isGenerated)
-                  validExpressionsMap += AggExpToColumnMappingModel(countExp)
-                }
-              } else {
-                // get the logical plan for expression
-                val exp = PreAggregateUtil.normalizeExprId(
-                  expressions.head,
-                  aggregate.allAttributes)
-                // check with same expression already  present then do not add to
-                // named expression list otherwise update the list and add it to set
-                if (!validExpressionsMap.contains(AggExpToColumnMappingModel(exp))) {
-                  namedExpressionList+=alias
-                  validExpressionsMap += AggExpToColumnMappingModel(exp)
-                }
+            namedExpressionList += attr
+          case alias@Alias(_: AttributeReference, _) =>
+            namedExpressionList += alias
+          case alias@Alias(aggExp: AggregateExpression, name) =>
+            // get the updated expression for avg convert it to two expression
+            // sum and count
+            val expressions = PreAggregateUtil.validateAggregateFunctionAndGetFields(aggExp)
+            // if size is more than one then it was for average
+            if(expressions.size > 1) {
+              val sumExp = PreAggregateUtil.normalizeExprId(
+                expressions.head,
+                aggregate.allAttributes)
+              // get the logical plan fro count expression
+              val countExp = PreAggregateUtil.normalizeExprId(
+                expressions.last,
+                aggregate.allAttributes)
+              // check with same expression already sum is present then do not add to
+              // named expression list otherwise update the list and add it to set
+              if (!validExpressionsMap.contains(AggExpToColumnMappingModel(sumExp))) {
+                namedExpressionList +=
+                Alias(expressions.head, name + "_ sum")(NamedExpression.newExprId,
+                  alias.qualifier,
+                  Some(alias.metadata),
+                  alias.isGenerated)
+                validExpressionsMap += AggExpToColumnMappingModel(sumExp)
+              }
+              // check with same expression already count is present then do not add to
+              // named expression list otherwise update the list and add it to set
+              if (!validExpressionsMap.contains(AggExpToColumnMappingModel(countExp))) {
+                namedExpressionList +=
+                Alias(expressions.last, name + "_ count")(NamedExpression.newExprId,
+                  alias.qualifier,
+                  Some(alias.metadata),
+                  alias.isGenerated)
+                validExpressionsMap += AggExpToColumnMappingModel(countExp)
+              }
+            } else {
+              // get the logical plan for expression
+              val exp = PreAggregateUtil.normalizeExprId(
+                expressions.head,
+                aggregate.allAttributes)
+              // check with same expression already  present then do not add to
+              // named expression list otherwise update the list and add it to set
+              if (!validExpressionsMap.contains(AggExpToColumnMappingModel(exp))) {
+                namedExpressionList+=alias
+                validExpressionsMap += AggExpToColumnMappingModel(exp)
               }
-            case alias@Alias(_: Expression, _) =>
-              namedExpressionList += alias
+            }
+          case alias@Alias(_: Expression, _) =>
+            namedExpressionList += alias
         }
         groupingExpressions foreach {
           case namedExpr: NamedExpression => namedExpressionList += namedExpr
@@ -1240,7 +1665,7 @@ case class CarbonPreAggregateDataLoadingRules(sparkSession: SparkSession)
   private def validateAggregateExpressions(namedExpression: Seq[NamedExpression]): Boolean = {
     val filteredExpressions = namedExpression.filterNot(_.isInstanceOf[UnresolvedAlias])
     filteredExpressions.exists { expr =>
-          !expr.name.equalsIgnoreCase("PreAgg") && expr.name.equalsIgnoreCase("preAggLoad")
-      }
+      !expr.name.equalsIgnoreCase("PreAgg") && expr.name.equalsIgnoreCase("preAggLoad")
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fb1516c0/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
index 27c7d17..80bcac2 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.execution.command.table.CarbonDropTableCommand
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
-import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonCommonConstantsInternal}
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, SessionParams}
 
 case class CarbonDropDatabaseCommand(command: DropDatabaseCommand)
@@ -81,7 +81,6 @@ case class CarbonSetCommand(command: SetCommand)
 
 object CarbonSetCommand {
   def validateAndSetValue(sessionParams: SessionParams, key: String, value: String): Unit = {
-
     val isCarbonProperty: Boolean = CarbonProperties.getInstance().isCarbonProperty(key)
     if (isCarbonProperty) {
       sessionParams.addProperty(key, value)
@@ -97,6 +96,8 @@ object CarbonSetCommand {
       }
     } else if (key.startsWith(CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS)) {
       sessionParams.addProperty(key.toLowerCase(), value)
+    } else if (key.startsWith(CarbonCommonConstantsInternal.QUERY_ON_PRE_AGG_STREAMING)) {
+      sessionParams.addProperty(key.toLowerCase(), value)
     }
   }