You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/11/13 22:12:05 UTC

[39/49] carbondata git commit: [CARBONDATA-1572][Streaming] Add test case for streaming ingest

[CARBONDATA-1572][Streaming] Add test case for streaming ingest

Add test case for streaming ingest using socket stream

This closes #1485


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/40c31e80
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/40c31e80
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/40c31e80

Branch: refs/heads/fgdatamap
Commit: 40c31e8049784a53594896cd2e37cd2fb5135fbc
Parents: 80195da
Author: QiangCai <qi...@qq.com>
Authored: Fri Nov 10 19:57:17 2017 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Mon Nov 13 19:10:21 2017 +0800

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |   5 +
 .../core/statusmanager/LoadMetadataDetails.java |   6 +
 .../store/impl/FileFactoryImplUnitTest.java     |  44 ++
 .../streaming/CarbonStreamOutputFormat.java     |  28 +
 .../streaming/CarbonStreamRecordWriter.java     |  17 +-
 .../streaming/CarbonStreamInputFormatTest.java  |  97 +++
 .../streaming/CarbonStreamOutputFormatTest.java | 119 +++
 .../hadoop/test/util/StoreCreator.java          |  18 +-
 .../src/test/resources/streamSample.csv         |   6 +
 .../org/apache/carbondata/api/CarbonStore.scala |  45 +-
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  |  10 +-
 .../org/apache/spark/sql/CarbonSource.scala     |   5 +
 .../TestStreamingTableOperation.scala           | 740 ++++++++++++++++++-
 .../processing/merger/CarbonDataMergerUtil.java |  27 +-
 .../streaming/segment/StreamSegment.java        |  13 +-
 .../streaming/StreamSinkFactory.scala           |  18 +-
 .../streaming/CarbonAppendableStreamSink.scala  |   8 +-
 17 files changed, 1161 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/40c31e80/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 5f63cc1..014478f 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -810,6 +810,11 @@ public final class CarbonCommonConstants {
   public static final String INVALID_SEGMENT_ID = "-1";
 
   /**
+   * default load time of the segment
+   */
+  public static final long SEGMENT_LOAD_TIME_DEFAULT = -1;
+
+  /**
    * Size of Major Compaction in MBs
    */
   @CarbonProperty

http://git-wip-us.apache.org/repos/asf/carbondata/blob/40c31e80/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
index 7748d17..51a04e4 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
@@ -84,6 +84,9 @@ public class LoadMetadataDetails implements Serializable {
   }
 
   public long getLoadEndTime() {
+    if (timestamp == null) {
+      return CarbonCommonConstants.SEGMENT_LOAD_TIME_DEFAULT;
+    }
     return convertTimeStampToLong(timestamp);
   }
 
@@ -161,6 +164,9 @@ public class LoadMetadataDetails implements Serializable {
    * @return the startLoadTime
    */
   public long getLoadStartTime() {
+    if (loadStartTime == null) {
+      return CarbonCommonConstants.SEGMENT_LOAD_TIME_DEFAULT;
+    }
     return convertTimeStampToLong(loadStartTime);
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/40c31e80/core/src/test/java/org/apache/carbondata/core/carbon/datastorage/filesystem/store/impl/FileFactoryImplUnitTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/carbon/datastorage/filesystem/store/impl/FileFactoryImplUnitTest.java b/core/src/test/java/org/apache/carbondata/core/carbon/datastorage/filesystem/store/impl/FileFactoryImplUnitTest.java
index 6fd27d3..65590d6 100644
--- a/core/src/test/java/org/apache/carbondata/core/carbon/datastorage/filesystem/store/impl/FileFactoryImplUnitTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/carbon/datastorage/filesystem/store/impl/FileFactoryImplUnitTest.java
@@ -18,8 +18,10 @@
 package org.apache.carbondata.core.carbon.datastorage.filesystem.store.impl;
 
 import java.io.File;
+import java.io.FileWriter;
 import java.io.IOException;
 
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 
 import mockit.Mock;
@@ -147,5 +149,47 @@ public class FileFactoryImplUnitTest {
     FileFactory.getDataOutputStream(filePath, FileFactory.FileType.VIEWFS);
     assertNotNull(FileFactory.getCarbonFile(filePath, FileFactory.FileType.HDFS));
   }
+
+  @Test public void testTruncateFile() {
+    FileWriter writer = null;
+    String path = null;
+    try {
+      // generate a file
+      path = new File("target/truncatFile").getCanonicalPath();
+      writer = new FileWriter(path);
+      for (int i = 0; i < 4000; i++) {
+        writer.write("test truncate file method");
+      }
+      writer.close();
+      CarbonFile file = FileFactory.getCarbonFile(path);
+      assertTrue(file.getSize() == 100000L);
+
+      // truncate file to 4000 bytes
+      FileFactory.truncateFile(
+          path,
+          FileFactory.getFileType(path),
+          4000);
+      file = FileFactory.getCarbonFile(path);
+      assertEquals(file.getSize(), 4000L);
+    } catch (IOException e) {
+      e.printStackTrace();
+      assertTrue(e.getMessage(), false);
+    } finally {
+      if (writer != null) {
+        try {
+          writer.close();
+        } catch (IOException e) {
+          e.printStackTrace();
+        }
+      }
+      if (path != null) {
+        try {
+          FileFactory.deleteFile(path, FileFactory.getFileType(path));
+        } catch (IOException e) {
+          e.printStackTrace();
+        }
+      }
+    }
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/40c31e80/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormat.java
index 1c21504..47b43c4 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormat.java
@@ -51,6 +51,24 @@ public class CarbonStreamOutputFormat extends FileOutputFormat<Void, Object> {
 
   private static final String LOAD_Model = "mapreduce.output.carbon.load.model";
 
+  private static final String SEGMENT_ID = "carbon.segment.id";
+
+  /**
+   * if the byte size of streaming segment reach this value,
+   * the system will create a new stream segment
+   */
+  public static final String HANDOFF_SIZE = "carbon.handoff.size";
+
+  /**
+   * the min handoff size of streaming segment, the unit is byte
+   */
+  public static final long HANDOFF_SIZE_MIN = 1024L * 1024 * 64;
+
+  /**
+   * the default handoff size of streaming segment, the unit is byte
+   */
+  public static final long HANDOFF_SIZE_DEFAULT = 1024L * 1024 * 1024;
+
   @Override public RecordWriter<Void, Object> getRecordWriter(TaskAttemptContext job)
       throws IOException, InterruptedException {
     return new CarbonStreamRecordWriter(job);
@@ -72,4 +90,14 @@ public class CarbonStreamOutputFormat extends FileOutputFormat<Void, Object> {
     }
   }
 
+  public static void setSegmentId(Configuration hadoopConf, String segmentId) throws IOException {
+    if (segmentId != null) {
+      hadoopConf.set(SEGMENT_ID, segmentId);
+    }
+  }
+
+  public static String getSegmentId(Configuration hadoopConf) throws IOException {
+    return hadoopConf.get(SEGMENT_ID);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/40c31e80/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java
index 3c4b55c..7df87e3 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java
@@ -75,6 +75,7 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> {
 
   // parser and converter
   private RowParser rowParser;
+  private BadRecordsLogger badRecordLogger;
   private RowConverter converter;
   private CarbonRow currentRow = new CarbonRow(null);
 
@@ -106,6 +107,8 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> {
       throw new IOException(
           "CarbonStreamRecordWriter require configuration: mapreduce.output.carbon.load.model");
     }
+    String segmentId = CarbonStreamOutputFormat.getSegmentId(hadoopConf);
+    carbonLoadModel.setSegmentId(segmentId);
     carbonTable = carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable();
     int taskNo = TaskID.forName(hadoopConf.get("mapred.tip.id")).getId();
     carbonLoadModel.setTaskNo("" + taskNo);
@@ -117,7 +120,7 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> {
 
     CarbonTablePath tablePath =
         CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier());
-    segmentDir = tablePath.getSegmentDir("0", carbonLoadModel.getSegmentId());
+    segmentDir = tablePath.getSegmentDir("0", segmentId);
     fileName = CarbonTablePath.getCarbonDataFileName(0, taskNo, 0, 0, "0");
   }
 
@@ -138,8 +141,7 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> {
 
     // initialize parser and converter
     rowParser = new RowParserImpl(dataFields, configuration);
-    BadRecordsLogger badRecordLogger =
-        DataConverterProcessorStepImpl.createBadRecordLogger(configuration);
+    badRecordLogger = DataConverterProcessorStepImpl.createBadRecordLogger(configuration);
     converter = new RowConverterImpl(configuration.getDataFields(), configuration, badRecordLogger);
     configuration.setCardinalityFinder(converter);
     converter.initialize();
@@ -285,14 +287,19 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> {
   @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException {
     try {
       // append remain buffer data
-      if (!hasException) {
+      if (!hasException && !isFirstRow) {
         appendBlockletToDataFile();
         converter.finish();
       }
     } finally {
       // close resource
       CarbonUtil.closeStreams(outputStream);
-      output.close();
+      if (output != null) {
+        output.close();
+      }
+      if (badRecordLogger != null) {
+        badRecordLogger.closeStreams();
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/40c31e80/hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormatTest.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormatTest.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormatTest.java
new file mode 100644
index 0000000..9970c50
--- /dev/null
+++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormatTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.hadoop.streaming;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.statusmanager.FileFormat;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
+
+import junit.framework.TestCase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.spark.SparkHadoopWriter;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class CarbonStreamInputFormatTest extends TestCase {
+
+  private TaskAttemptID taskAttemptId;
+  private TaskAttemptContext taskAttemptContext;
+  private Configuration hadoopConf;
+  private AbsoluteTableIdentifier identifier;
+  private String storePath;
+
+  @Override protected void setUp() throws Exception {
+    storePath = new File("target/stream_input").getCanonicalPath();
+    String dbName = "default";
+    String tableName = "stream_table_input";
+    identifier = new AbsoluteTableIdentifier(storePath,
+        new CarbonTableIdentifier(dbName, tableName, UUID.randomUUID().toString()));
+
+    JobID jobId = SparkHadoopWriter.createJobID(new Date(), 0);
+    TaskID taskId = new TaskID(jobId, TaskType.MAP, 0);
+    taskAttemptId = new TaskAttemptID(taskId, 0);
+
+    hadoopConf = new Configuration();
+    taskAttemptContext = new TaskAttemptContextImpl(hadoopConf, taskAttemptId);
+  }
+
+  private InputSplit buildInputSplit() throws IOException {
+    CarbonInputSplit carbonInputSplit = new CarbonInputSplit();
+    List<CarbonInputSplit> splitList = new ArrayList<>();
+    splitList.add(carbonInputSplit);
+    return new CarbonMultiBlockSplit(identifier, splitList, new String[] { "localhost" },
+        FileFormat.rowformat);
+  }
+
+  @Test public void testCreateRecordReader() {
+    try {
+      InputSplit inputSplit = buildInputSplit();
+      CarbonStreamInputFormat inputFormat = new CarbonStreamInputFormat();
+      RecordReader recordReader = inputFormat.createRecordReader(inputSplit, taskAttemptContext);
+      Assert.assertNotNull("Failed to create record reader", recordReader);
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.assertTrue(e.getMessage(), false);
+    }
+  }
+
+  @Override protected void tearDown() throws Exception {
+    super.tearDown();
+    if (storePath != null) {
+      FileFactory.deleteAllFilesOfDir(new File(storePath));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/40c31e80/hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormatTest.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormatTest.java
new file mode 100644
index 0000000..daa2540
--- /dev/null
+++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormatTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.hadoop.streaming;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Date;
+import java.util.UUID;
+
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.hadoop.test.util.StoreCreator;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
+
+import junit.framework.TestCase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.spark.SparkHadoopWriter;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class CarbonStreamOutputFormatTest extends TestCase {
+
+  private Configuration hadoopConf;
+  private TaskAttemptID taskAttemptId;
+  private CarbonLoadModel carbonLoadModel;
+  private String storePath;
+
+  @Override protected void setUp() throws Exception {
+    super.setUp();
+    JobID jobId = SparkHadoopWriter.createJobID(new Date(), 0);
+    TaskID taskId = new TaskID(jobId, TaskType.MAP, 0);
+    taskAttemptId = new TaskAttemptID(taskId, 0);
+
+    hadoopConf = new Configuration();
+    hadoopConf.set("mapred.job.id", jobId.toString());
+    hadoopConf.set("mapred.tip.id", taskAttemptId.getTaskID().toString());
+    hadoopConf.set("mapred.task.id", taskAttemptId.toString());
+    hadoopConf.setBoolean("mapred.task.is.map", true);
+    hadoopConf.setInt("mapred.task.partition", 0);
+
+    storePath = new File("target/stream_output").getCanonicalPath();
+    String dbName = "default";
+    String tableName = "stream_table_output";
+    AbsoluteTableIdentifier identifier = new AbsoluteTableIdentifier(storePath,
+        new CarbonTableIdentifier(dbName, tableName, UUID.randomUUID().toString()));
+
+    CarbonTable table = StoreCreator.createTable(identifier);
+
+    String factFilePath = new File("../hadoop/src/test/resources/data.csv").getCanonicalPath();
+    carbonLoadModel = StoreCreator.buildCarbonLoadModel(table, factFilePath, identifier);
+  }
+
+  @Test public void testSetCarbonLoadModel() {
+    try {
+      CarbonStreamOutputFormat.setCarbonLoadModel(hadoopConf, carbonLoadModel);
+    } catch (IOException e) {
+      Assert.assertTrue("Failed to config CarbonLoadModel for CarbonStreamOutputFromat", false);
+    }
+  }
+
+  @Test public void testGetCarbonLoadModel() {
+    try {
+      CarbonStreamOutputFormat.setCarbonLoadModel(hadoopConf, carbonLoadModel);
+      CarbonLoadModel model = CarbonStreamOutputFormat.getCarbonLoadModel(hadoopConf);
+
+      Assert.assertNotNull("Failed to get CarbonLoadModel", model);
+      Assert.assertTrue("CarbonLoadModel should be same with previous",
+          carbonLoadModel.getFactTimeStamp() == model.getFactTimeStamp());
+
+    } catch (IOException e) {
+      Assert.assertTrue("Failed to get CarbonLoadModel for CarbonStreamOutputFromat", false);
+    }
+  }
+
+  @Test public void testGetRecordWriter() {
+    CarbonStreamOutputFormat outputFormat = new CarbonStreamOutputFormat();
+    try {
+      CarbonStreamOutputFormat.setCarbonLoadModel(hadoopConf, carbonLoadModel);
+      TaskAttemptContext taskAttemptContext =
+          new TaskAttemptContextImpl(hadoopConf, taskAttemptId);
+      RecordWriter recordWriter = outputFormat.getRecordWriter(taskAttemptContext);
+      Assert.assertNotNull("Failed to get CarbonStreamRecordWriter", recordWriter);
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.assertTrue(e.getMessage(), false);
+    }
+  }
+
+  @Override protected void tearDown() throws Exception {
+    super.tearDown();
+    if (storePath != null) {
+      FileFactory.deleteAllFilesOfDir(new File(storePath));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/40c31e80/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
index 29d8d03..dd48cd9 100644
--- a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
+++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
@@ -49,7 +49,6 @@ import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.ColumnIdentifier;
 import org.apache.carbondata.core.metadata.converter.SchemaConverter;
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
-import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.SchemaEvolution;
@@ -116,7 +115,8 @@ public class StoreCreator {
     return absoluteTableIdentifier;
   }
 
-  public static CarbonLoadModel buildCarbonLoadModel(CarbonTable table, String factFilePath) {
+  public static CarbonLoadModel buildCarbonLoadModel(CarbonTable table, String factFilePath,
+      AbsoluteTableIdentifier absoluteTableIdentifier) {
     CarbonDataLoadSchema schema = new CarbonDataLoadSchema(table);
     CarbonLoadModel loadModel = new CarbonLoadModel();
     loadModel.setCarbonDataLoadSchema(schema);
@@ -159,28 +159,28 @@ public class StoreCreator {
    * Create store without any restructure
    */
   public static void createCarbonStore() {
-
     try {
-
-      String factFilePath = new File("../hadoop/src/test/resources/data.csv").getCanonicalPath();
+      String factFilePath =
+          new File("../hadoop/src/test/resources/data.csv").getCanonicalPath();
       File storeDir = new File(absoluteTableIdentifier.getStorePath());
       CarbonUtil.deleteFoldersAndFiles(storeDir);
       CarbonProperties.getInstance().addProperty(CarbonCommonConstants.STORE_LOCATION_HDFS,
           absoluteTableIdentifier.getStorePath());
 
-      CarbonTable table = createTable();
+      CarbonTable table = createTable(absoluteTableIdentifier);
       writeDictionary(factFilePath, table);
-      CarbonLoadModel loadModel = buildCarbonLoadModel(table, factFilePath);
+      CarbonLoadModel loadModel =
+          buildCarbonLoadModel(table, factFilePath, absoluteTableIdentifier);
 
       executeGraph(loadModel, absoluteTableIdentifier.getStorePath());
 
     } catch (Exception e) {
       e.printStackTrace();
     }
-
   }
 
-  public static CarbonTable createTable() throws IOException {
+  public static CarbonTable createTable(
+      AbsoluteTableIdentifier absoluteTableIdentifier) throws IOException {
     TableInfo tableInfo = new TableInfo();
     tableInfo.setStorePath(absoluteTableIdentifier.getStorePath());
     tableInfo.setDatabaseName(absoluteTableIdentifier.getCarbonTableIdentifier().getDatabaseName());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/40c31e80/integration/spark-common-test/src/test/resources/streamSample.csv
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/streamSample.csv b/integration/spark-common-test/src/test/resources/streamSample.csv
new file mode 100644
index 0000000..590ea90
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/streamSample.csv
@@ -0,0 +1,6 @@
+id,name,city,salary,file
+100000001,batch_1,city_1,0.1,school_1:school_11$20
+100000002,batch_2,city_2,0.2,school_2:school_22$30
+100000003,batch_3,city_3,0.3,school_3:school_33$40
+100000004,batch_4,city_4,0.4,school_4:school_44$50
+100000005,batch_5,city_5,0.5,school_5:school_55$60

http://git-wip-us.apache.org/repos/asf/carbondata/blob/40c31e80/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
index a95bc01..054f778 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
@@ -64,21 +64,36 @@ object CarbonStore {
       }
 
       loadMetadataDetailsSortedArray
-          .filter(_.getVisibility.equalsIgnoreCase("true"))
-          .map { load =>
-            val mergedTo = if (load.getMergedLoadName != null) {
-         load.getMergedLoadName
-       } else {
-         ""
-       }
-            Row(
-              load.getLoadName,
-              load.getLoadStatus,
-              new java.sql.Timestamp(load.getLoadStartTime),
-              new java.sql.Timestamp(load.getLoadEndTime),
-              mergedTo
-            )
-          }.toSeq
+        .filter(_.getVisibility.equalsIgnoreCase("true"))
+        .map { load =>
+          val mergedTo = if (load.getMergedLoadName != null) {
+            load.getMergedLoadName
+          } else {
+            ""
+          }
+
+          val startTime =
+            if (load.getLoadStartTime == CarbonCommonConstants.SEGMENT_LOAD_TIME_DEFAULT) {
+              null
+            } else {
+              new java.sql.Timestamp(load.getLoadStartTime)
+            }
+
+          val endTime =
+            if (load.getLoadEndTime == CarbonCommonConstants.SEGMENT_LOAD_TIME_DEFAULT) {
+              null
+            } else {
+              new java.sql.Timestamp(load.getLoadEndTime)
+            }
+
+          Row(
+            load.getLoadName,
+            load.getLoadStatus,
+            startTime,
+            endTime,
+            mergedTo
+          )
+        }.toSeq
     } else {
       Seq.empty
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/40c31e80/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index 1f88f25..c6edc2a 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -40,7 +40,7 @@ import org.apache.carbondata.core.metadata.blocklet.DataFileFooter
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
 import org.apache.carbondata.core.mutate.UpdateVO
 import org.apache.carbondata.core.scan.result.iterator.RawResultIterator
-import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager
+import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentUpdateStatusManager}
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataTypeUtil}
 import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit}
 import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
@@ -307,7 +307,8 @@ class CarbonMergerRDD[K, V](
 
       // keep on assigning till last one is reached.
       if (null != splits && splits.size > 0) {
-        splitsOfLastSegment = splits.asScala.map(_.asInstanceOf[CarbonInputSplit]).toList.asJava
+        splitsOfLastSegment = splits.asScala.map(_.asInstanceOf[CarbonInputSplit])
+          .filter { split => FileFormat.carbondata.equals(split.getFileFormat) }.toList.asJava
       }
 
       carbonInputSplits ++:= splits.asScala.map(_.asInstanceOf[CarbonInputSplit]).filter(entry => {
@@ -316,9 +317,10 @@ class CarbonMergerRDD[K, V](
           entry.getLocations, entry.getLength, entry.getVersion,
           updateStatusManager.getDeleteDeltaFilePath(entry.getPath.toString)
         )
-        ((!updated) || ((updated) && (!CarbonUtil
+        (((!updated) || ((updated) && (!CarbonUtil
           .isInvalidTableBlock(blockInfo.getSegmentId, blockInfo.getFilePath,
-            updateDetails, updateStatusManager))))
+            updateDetails, updateStatusManager)))) &&
+         FileFormat.carbondata.equals(entry.getFileFormat))
       })
     }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/40c31e80/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index 4580f22..de01c8d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -230,6 +230,11 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
         CarbonEnv.getInstance(sparkSession).carbonMetastore.
           createCarbonRelation(parameters, identifier, sparkSession).tableMeta.carbonTable
 
+      if (!carbonTable.isStreamingTable) {
+        throw new CarbonStreamException(s"Table ${carbonTable.getDatabaseName}." +
+                                        s"${carbonTable.getFactTableName} is not a streaming table")
+      }
+
       // create sink
       StreamSinkFactory.createStreamTableSink(
         sqlContext.sparkSession,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/40c31e80/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index d5f9426..ffa977b 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -17,15 +17,30 @@
 
 package org.apache.spark.carbondata
 
-import org.apache.spark.sql.catalyst.parser.ParseException
+import java.io.{File, PrintWriter}
+import java.net.ServerSocket
+import java.util.concurrent.Executors
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.{CarbonEnv, Row, SaveMode, SparkSession}
+import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
 import org.apache.spark.sql.test.util.QueryTest
+import org.apache.spark.sql.types.StructType
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.hadoop.streaming.CarbonStreamOutputFormat
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
 class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
+
+  private val spark = sqlContext.sparkSession
+  private val dataFilePath = s"$resourcesPath/streamSample.csv"
+
   override def beforeAll {
     sql("DROP DATABASE IF EXISTS streaming CASCADE")
     sql("CREATE DATABASE streaming")
@@ -41,6 +56,52 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
         | TBLPROPERTIES ('streaming' = 'true')
       """.stripMargin)
     sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO TABLE source""")
+
+    dropTable()
+
+    // 1. normal table not support streaming ingest
+    createTable(tableName = "batch_table", streaming = false, withBatchLoad = true)
+
+    // 2. streaming table with different input source
+    // socket source
+    createTable(tableName = "stream_table_socket", streaming = true, withBatchLoad = true)
+    // file source
+    createTable(tableName = "stream_table_file", streaming = true, withBatchLoad = true)
+
+    // 3. streaming table with bad records
+    createTable(tableName = "bad_record_force", streaming = true, withBatchLoad = true)
+    createTable(tableName = "bad_record_fail", streaming = true, withBatchLoad = true)
+
+    // 4. streaming frequency check
+    createTable(tableName = "stream_table_1s", streaming = true, withBatchLoad = true)
+    createTable(tableName = "stream_table_10s", streaming = true, withBatchLoad = true)
+
+    // 5. streaming table execute batch loading
+    createTable(tableName = "stream_table_batch", streaming = true, withBatchLoad = false)
+
+    // 6. detail query
+    // full scan
+    createTable(tableName = "stream_table_scan", streaming = true, withBatchLoad = true)
+    createTableWithComplexType(
+      tableName = "stream_table_scan_complex", streaming = true, withBatchLoad = true)
+    // filter scan
+    createTable(tableName = "stream_table_filter", streaming = true, withBatchLoad = true)
+    createTableWithComplexType(
+      tableName = "stream_table_filter_complex", streaming = true, withBatchLoad = true)
+
+    // 7. aggregate query
+    createTable(tableName = "stream_table_agg", streaming = true, withBatchLoad = true)
+    createTableWithComplexType(
+      tableName = "stream_table_agg_complex", streaming = true, withBatchLoad = true)
+
+    // 8. compaction
+    createTable(tableName = "stream_table_compact", streaming = true, withBatchLoad = true)
+
+    // 9. create new stream segment if current stream segment is full
+    createTable(tableName = "stream_table_new", streaming = true, withBatchLoad = true)
+
+    // 10. fault tolerant
+    createTable(tableName = "stream_table_tolerant", streaming = true, withBatchLoad = true)
   }
 
   test("validate streaming property") {
@@ -96,7 +157,682 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
   }
 
   override def afterAll {
+    dropTable()
     sql("USE default")
     sql("DROP DATABASE IF EXISTS streaming CASCADE")
   }
+
+  def dropTable(): Unit = {
+    sql("drop table if exists streaming.batch_table")
+    sql("drop table if exists streaming.stream_table_socket")
+    sql("drop table if exists streaming.stream_table_file")
+    sql("drop table if exists streaming.bad_record_force")
+    sql("drop table if exists streaming.bad_record_fail")
+    sql("drop table if exists streaming.stream_table_1s")
+    sql("drop table if exists streaming.stream_table_10s")
+    sql("drop table if exists streaming.stream_table_batch")
+    sql("drop table if exists streaming.stream_table_scan")
+    sql("drop table if exists streaming.stream_table_scan_complex")
+    sql("drop table if exists streaming.stream_table_filter")
+    sql("drop table if exists streaming.stream_table_filter_complex")
+    sql("drop table if exists streaming.stream_table_agg")
+    sql("drop table if exists streaming.stream_table_agg_complex")
+    sql("drop table if exists streaming.stream_table_compact")
+    sql("drop table if exists streaming.stream_table_new")
+    sql("drop table if exists streaming.stream_table_tolerant")
+  }
+
+  // normal table not support streaming ingest
+  test("normal table not support streaming ingest") {
+    val identifier = new TableIdentifier("batch_table", Option("streaming"))
+    val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark)
+      .asInstanceOf[CarbonRelation].metaData.carbonTable
+    val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
+    var server: ServerSocket = null
+    try {
+      server = new ServerSocket(7071)
+      val thread1 = createWriteSocketThread(server, 2, 10, 1)
+      thread1.start()
+      // use thread pool to catch the exception of sink thread
+      val pool = Executors.newSingleThreadExecutor()
+      val thread2 = createSocketStreamingThread(spark, tablePath)
+      val future = pool.submit(thread2)
+      Thread.sleep(1000)
+      thread1.interrupt()
+      try {
+        future.get()
+        assert(false)
+      } catch {
+        case ex =>
+          assert(ex.getMessage.contains("is not a streaming table"))
+      }
+    } finally {
+      if (server != null) {
+        server.close()
+      }
+    }
+  }
+
+  // input source: socket
+  test("streaming ingest from socket source") {
+    executeStreamingIngest(
+      tableName = "stream_table_socket",
+      batchNums = 2,
+      rowNumsEachBatch = 10,
+      intervalOfSource = 1,
+      intervalOfIngest = 1,
+      continueSeconds = 10,
+      generateBadRecords = false,
+      badRecordAction = "force"
+    )
+
+    checkAnswer(
+      sql("select count(*) from streaming.stream_table_socket"),
+      Seq(Row(25))
+    )
+  }
+
+  // input source: file
+  test("streaming ingest from file source") {
+    val identifier = new TableIdentifier("stream_table_file", Option("streaming"))
+    val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark)
+      .asInstanceOf[CarbonRelation].metaData.carbonTable
+    val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
+    val csvDataDir = new File("target/csvdata").getCanonicalPath
+    // streaming ingest 10 rows
+    generateCSVDataFile(spark, idStart = 10, rowNums = 10, csvDataDir)
+    val thread = createFileStreamingThread(spark, tablePath, csvDataDir, intervalSecond = 1)
+    thread.start()
+    Thread.sleep(2000)
+    generateCSVDataFile(spark, idStart = 30, rowNums = 10, csvDataDir)
+    Thread.sleep(10000)
+    thread.interrupt()
+    checkAnswer(
+      sql("select count(*) from streaming.stream_table_file"),
+      Seq(Row(25))
+    )
+  }
+
+  // bad records
+  test("streaming table with bad records action: force") {
+    executeStreamingIngest(
+      tableName = "bad_record_force",
+      batchNums = 2,
+      rowNumsEachBatch = 10,
+      intervalOfSource = 1,
+      intervalOfIngest = 1,
+      continueSeconds = 10,
+      generateBadRecords = true,
+      badRecordAction = "force"
+    )
+    checkAnswer(
+      sql("select count(*) from streaming.stream_table_socket"),
+      Seq(Row(25))
+    )
+
+  }
+
+  test("streaming table with bad records action: fail") {
+    executeStreamingIngest(
+      tableName = "bad_record_fail",
+      batchNums = 2,
+      rowNumsEachBatch = 10,
+      intervalOfSource = 1,
+      intervalOfIngest = 1,
+      continueSeconds = 10,
+      generateBadRecords = true,
+      badRecordAction = "fail"
+    )
+    val result = sql("select count(*) from streaming.bad_record_fail").collect()
+    assert(result(0).getLong(0) < 25)
+  }
+
+  // ingest with different interval
+  test("1 row per 1 second interval") {
+    executeStreamingIngest(
+      tableName = "stream_table_1s",
+      batchNums = 20,
+      rowNumsEachBatch = 1,
+      intervalOfSource = 1,
+      intervalOfIngest = 1,
+      continueSeconds = 20,
+      generateBadRecords = false,
+      badRecordAction = "force"
+    )
+    val result = sql("select count(*) from streaming.stream_table_1s").collect()
+    // 20 seconds can't ingest all data, exists data delay
+    assert(result(0).getLong(0) > 5 + 10)
+  }
+
+  test("10000 row per 10 seconds interval") {
+    executeStreamingIngest(
+      tableName = "stream_table_10s",
+      batchNums = 5,
+      rowNumsEachBatch = 10000,
+      intervalOfSource = 5,
+      intervalOfIngest = 10,
+      continueSeconds = 50,
+      generateBadRecords = false,
+      badRecordAction = "force"
+    )
+    checkAnswer(
+      sql("select count(*) from streaming.stream_table_10s"),
+      Seq(Row(5 + 10000*5)))
+  }
+
+  // batch loading on streaming table
+  test("streaming table execute batch loading") {
+    executeStreamingIngest(
+      tableName = "stream_table_batch",
+      batchNums = 5,
+      rowNumsEachBatch = 100,
+      intervalOfSource = 3,
+      intervalOfIngest = 5,
+      continueSeconds = 20,
+      generateBadRecords = false,
+      badRecordAction = "force"
+    )
+    checkAnswer(
+      sql("select count(*) from streaming.stream_table_batch"),
+      Seq(Row(100*5)))
+
+    executeBatchLoad("stream_table_batch")
+
+    checkAnswer(
+      sql("select count(*) from streaming.stream_table_batch"),
+      Seq(Row(100*5 + 5)))
+  }
+
+  // detail query on batch and stream segment
+  test("non-filter query on stream table with dictionary, sort_columns") {
+    executeStreamingIngest(
+      tableName = "stream_table_scan",
+      batchNums = 5,
+      rowNumsEachBatch = 10,
+      intervalOfSource = 2,
+      intervalOfIngest = 4,
+      continueSeconds = 20,
+      generateBadRecords = false,
+      badRecordAction = "force"
+    )
+
+    val result = sql("select * from streaming.stream_table_scan order by id").collect()
+    assert(result != null)
+    assert(result.length == 55)
+    // check one row of streaming data
+    assert(result(0).getInt(0) == 1)
+    assert(result(0).getString(1) == "name_1")
+    // check one row of batch loading
+    assert(result(50).getInt(0) == 100000001)
+    assert(result(50).getString(1) == "batch_1")
+  }
+
+  test("non-filter query on stream table with dictionary, sort_columns and complex column") {
+    executeStreamingIngest(
+      tableName = "stream_table_scan_complex",
+      batchNums = 5,
+      rowNumsEachBatch = 10,
+      intervalOfSource = 2,
+      intervalOfIngest = 4,
+      continueSeconds = 20,
+      generateBadRecords = false,
+      badRecordAction = "force"
+    )
+
+    val result = sql("select * from streaming.stream_table_scan_complex order by id").collect()
+    assert(result != null)
+    assert(result.length == 55)
+    // check one row of streaming data
+    assert(result(0).getInt(0) == 1)
+    assert(result(0).getString(1) == "name_1")
+    assert(result(0).getStruct(4).getInt(1) == 1)
+    // check one row of batch loading
+    assert(result(50).getInt(0) == 100000001)
+    assert(result(50).getString(1) == "batch_1")
+    assert(result(50).getStruct(4).getInt(1) == 20)
+  }
+
+  test("filter query on stream table with dictionary, sort_columns") {
+    executeStreamingIngest(
+      tableName = "stream_table_filter",
+      batchNums = 5,
+      rowNumsEachBatch = 10,
+      intervalOfSource = 2,
+      intervalOfIngest = 4,
+      continueSeconds = 20,
+      generateBadRecords = true,
+      badRecordAction = "force"
+    )
+
+    checkAnswer(
+      sql("select * from stream_table_filter where id = 1"),
+      Seq(Row(1, "name_1", "city_1", 10000.0)))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where name = 'name_2'"),
+      Seq(Row(2, "name_2", "", 20000.0)))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where city = 'city_1'"),
+      Seq(Row(1, "name_1", "city_1", 10000.0),
+        Row(100000001, "batch_1", "city_1", 0.1)))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where id > 49 and id < 100000002"),
+      Seq(Row(50, "name_50", "city_50", 500000.0),
+        Row(100000001, "batch_1", "city_1", 0.1)))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where id is null"),
+      Seq(Row(null, "name_6", "city_6", 60000.0)))
+
+    checkAnswer(
+      sql("select * from stream_table_filter where city = ''"),
+      Seq(Row(2, "name_2", "", 20000.0)))
+
+  }
+
+  test("filter query on stream table with dictionary, sort_columns and complex column") {
+    executeStreamingIngest(
+      tableName = "stream_table_filter_complex",
+      batchNums = 5,
+      rowNumsEachBatch = 10,
+      intervalOfSource = 2,
+      intervalOfIngest = 4,
+      continueSeconds = 20,
+      generateBadRecords = true,
+      badRecordAction = "force"
+    )
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where id = 1"),
+      Seq(Row(1, "name_1", "city_1", 10000.0, Row(wrap(Array("school_1", "school_11")), 1))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where name = 'name_2'"),
+      Seq(Row(2, "name_2", "", 20000.0, Row(wrap(Array("school_2", "school_22")), 2))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where file.age = 3"),
+      Seq(Row(3, "name_3", "city_3", 30000.0, Row(wrap(Array("school_3", "school_33")), 3))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where city = 'city_1'"),
+      Seq(Row(1, "name_1", "city_1", 10000.0, Row(wrap(Array("school_1", "school_11")), 1)),
+        Row(100000001, "batch_1", "city_1", 0.1, Row(wrap(Array("school_1", "school_11")), 20))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where id > 49 and id < 100000002"),
+      Seq(Row(50, "name_50", "city_50", 500000.0, Row(wrap(Array("school_50", "school_5050")), 50)),
+        Row(100000001, "batch_1", "city_1", 0.1, Row(wrap(Array("school_1", "school_11")), 20))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where id is null"),
+      Seq(Row(null, "name_6", "city_6", 60000.0, Row(wrap(Array("school_6", "school_66")), 6))))
+
+    checkAnswer(
+      sql("select * from stream_table_filter_complex where city = ''"),
+      Seq(Row(2, "name_2", "", 20000.0, Row(wrap(Array("school_2", "school_22")), 2))))
+
+  }
+
+  // aggregation
+  test("aggregation query") {
+    executeStreamingIngest(
+      tableName = "stream_table_agg",
+      batchNums = 5,
+      rowNumsEachBatch = 10,
+      intervalOfSource = 2,
+      intervalOfIngest = 4,
+      continueSeconds = 20,
+      generateBadRecords = true,
+      badRecordAction = "force"
+    )
+
+    checkAnswer(
+      sql("select count(*), max(id), min(name), cast(avg(id) as integer), sum(id) " +
+          "from stream_table_agg where id >= 2 and id <= 100000004"),
+      Seq(Row(52, 100000004, "batch_1", 7692332, 400001278)))
+
+    checkAnswer(
+      sql("select city, count(id), sum(id), cast(avg(id) as integer), " +
+          "max(salary), min(salary) " +
+          "from stream_table_agg " +
+          "where name in ('batch_1', 'batch_2', 'batch_3', 'name_1', 'name_2', 'name_3') " +
+          "and city <> '' " +
+          "group by city " +
+          "order by city"),
+      Seq(Row("city_1", 2, 100000002, 50000001, 10000.0, 0.1),
+        Row("city_2", 1, 100000002, 100000002, 0.2, 0.2),
+        Row("city_3", 2, 100000006, 50000003, 30000.0, 0.3)))
+  }
+
+  test("aggregation query with complex") {
+    executeStreamingIngest(
+      tableName = "stream_table_agg_complex",
+      batchNums = 5,
+      rowNumsEachBatch = 10,
+      intervalOfSource = 2,
+      intervalOfIngest = 4,
+      continueSeconds = 20,
+      generateBadRecords = true,
+      badRecordAction = "force"
+    )
+
+    checkAnswer(
+      sql("select count(*), max(id), min(name), cast(avg(file.age) as integer), sum(file.age) " +
+          "from stream_table_agg_complex where id >= 2 and id <= 100000004"),
+      Seq(Row(52, 100000004, "batch_1", 27, 1408)))
+
+    checkAnswer(
+      sql("select city, count(id), sum(id), cast(avg(file.age) as integer), " +
+          "max(salary), min(salary) " +
+          "from stream_table_agg_complex " +
+          "where name in ('batch_1', 'batch_2', 'batch_3', 'name_1', 'name_2', 'name_3') " +
+          "and city <> '' " +
+          "group by city " +
+          "order by city"),
+      Seq(Row("city_1", 2, 100000002, 10, 10000.0, 0.1),
+        Row("city_2", 1, 100000002, 30, 0.2, 0.2),
+        Row("city_3", 2, 100000006, 21, 30000.0, 0.3)))
+  }
+
+  // compaction
+  test("test compaction on stream table") {
+    executeStreamingIngest(
+      tableName = "stream_table_compact",
+      batchNums = 5,
+      rowNumsEachBatch = 10,
+      intervalOfSource = 2,
+      intervalOfIngest = 4,
+      continueSeconds = 20,
+      generateBadRecords = false,
+      badRecordAction = "force"
+    )
+    for (_ <- 0 to 3) {
+      executeBatchLoad("stream_table_compact")
+    }
+
+    sql("alter table streaming.stream_table_compact compact 'minor'")
+
+    val result = sql("show segments for table streaming.stream_table_compact").collect()
+    result.foreach { row =>
+      if (row.getString(0).equals("1")) {
+        assert(row.getString(1).equals(CarbonCommonConstants.STORE_LOADSTATUS_STREAMING))
+      }
+    }
+  }
+
+  // stream segment max size
+  test("create new stream segment if current stream segment is full") {
+    executeStreamingIngest(
+      tableName = "stream_table_new",
+      batchNums = 6,
+      rowNumsEachBatch = 10000,
+      intervalOfSource = 5,
+      intervalOfIngest = 10,
+      continueSeconds = 40,
+      generateBadRecords = false,
+      badRecordAction = "force",
+      handoffSize = 1024L * 200
+    )
+    sql("show segments for table streaming.stream_table_new").show(100, false)
+
+    assert(sql("show segments for table streaming.stream_table_new").count() == 4)
+
+    checkAnswer(
+      sql("select count(*) from streaming.stream_table_new"),
+      Seq(Row(5 + 10000 * 6))
+    )
+  }
+
+  def createWriteSocketThread(
+      serverSocket: ServerSocket,
+      writeNums: Int,
+      rowNums: Int,
+      intervalSecond: Int,
+      badRecords: Boolean = false): Thread = {
+    new Thread() {
+      override def run(): Unit = {
+        // wait for client to connection request and accept
+        val clientSocket = serverSocket.accept()
+        val socketWriter = new PrintWriter(clientSocket.getOutputStream())
+        var index = 0
+        for (_ <- 1 to writeNums) {
+          // write 5 records per iteration
+          for (_ <- 1 to rowNums) {
+            index = index + 1
+            if (badRecords) {
+              if (index == 2) {
+                // null value
+                socketWriter.println(index.toString + ",name_" + index
+                                     + ",," + (10000.00 * index).toString +
+                                     ",school_" + index + ":school_" + index + index + "$" + index)
+              } else if (index == 6) {
+                // illegal number
+                socketWriter.println(index.toString + "abc,name_" + index
+                                     + ",city_" + index + "," + (10000.00 * index).toString +
+                                     ",school_" + index + ":school_" + index + index + "$" + index)
+              } else {
+                socketWriter.println(index.toString + ",name_" + index
+                                     + ",city_" + index + "," + (10000.00 * index).toString +
+                                     ",school_" + index + ":school_" + index + index + "$" + index)
+              }
+            } else {
+              socketWriter.println(index.toString + ",name_" + index
+                                   + ",city_" + index + "," + (10000.00 * index).toString +
+                                   ",school_" + index + ":school_" + index + index + "$" + index)
+            }
+          }
+          socketWriter.flush()
+          Thread.sleep(1000 * intervalSecond)
+        }
+        socketWriter.close()
+      }
+    }
+  }
+
+  def createSocketStreamingThread(
+      spark: SparkSession,
+      tablePath: CarbonTablePath,
+      badRecordAction: String = "force",
+      intervalSecond: Int = 2,
+      handoffSize: Long = CarbonStreamOutputFormat.HANDOFF_SIZE_DEFAULT): Thread = {
+    new Thread() {
+      override def run(): Unit = {
+        var qry: StreamingQuery = null
+        try {
+          val readSocketDF = spark.readStream
+            .format("socket")
+            .option("host", "localhost")
+            .option("port", 7071)
+            .load()
+
+          // Write data from socket stream to carbondata file
+          qry = readSocketDF.writeStream
+            .format("carbondata")
+            .trigger(ProcessingTime(s"$intervalSecond seconds"))
+            .option("checkpointLocation", tablePath.getStreamingCheckpointDir)
+            .option("tablePath", tablePath.getPath)
+            .option("bad_records_action", badRecordAction)
+            .option(CarbonStreamOutputFormat.HANDOFF_SIZE, handoffSize)
+            .start()
+          qry.awaitTermination()
+        } catch {
+          case ex =>
+            throw new Exception(ex.getMessage)
+        } finally {
+          if (null != qry) {
+            qry.stop()
+          }
+        }
+      }
+    }
+  }
+
+  def executeStreamingIngest(
+      tableName: String,
+      batchNums: Int,
+      rowNumsEachBatch: Int,
+      intervalOfSource: Int,
+      intervalOfIngest: Int,
+      continueSeconds: Int,
+      generateBadRecords: Boolean,
+      badRecordAction: String,
+      handoffSize: Long = CarbonStreamOutputFormat.HANDOFF_SIZE_DEFAULT
+  ): Unit = {
+    val identifier = new TableIdentifier(tableName, Option("streaming"))
+    val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark)
+      .asInstanceOf[CarbonRelation].metaData.carbonTable
+    val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
+    var server: ServerSocket = null
+    try {
+      server = new ServerSocket(7071)
+
+      val thread1 = createWriteSocketThread(
+        serverSocket = server,
+        writeNums = batchNums,
+        rowNums = rowNumsEachBatch,
+        intervalSecond = intervalOfSource,
+        badRecords = generateBadRecords)
+      val thread2 = createSocketStreamingThread(
+        spark = spark,
+        tablePath = tablePath,
+        badRecordAction = badRecordAction,
+        intervalSecond = intervalOfIngest,
+        handoffSize = handoffSize)
+      thread1.start()
+      thread2.start()
+      Thread.sleep(continueSeconds * 1000)
+      thread1.interrupt()
+      thread2.interrupt()
+    } finally {
+      if (null != server) {
+        server.close()
+      }
+    }
+  }
+
+  def generateCSVDataFile(
+      spark: SparkSession,
+      idStart: Int,
+      rowNums: Int,
+      csvDirPath: String): Unit = {
+    // Create csv data frame file
+    val csvRDD = spark.sparkContext.parallelize(idStart until idStart + rowNums)
+      .map { id =>
+        (id,
+          "name_" + id,
+          "city_" + id,
+          10000.00 * id,
+          "school_" + id + ":school_" + id + id + "$" + id)
+      }
+    val csvDataDF = spark.createDataFrame(csvRDD).toDF("id", "name", "city", "salary", "file")
+
+    csvDataDF.write
+      .option("header", "false")
+      .mode(SaveMode.Overwrite)
+      .csv(csvDirPath)
+  }
+
+  def createFileStreamingThread(
+      spark: SparkSession,
+      tablePath: CarbonTablePath,
+      csvDataDir: String,
+      intervalSecond: Int): Thread = {
+    new Thread() {
+      override def run(): Unit = {
+        val inputSchema = new StructType()
+          .add("id", "integer")
+          .add("name", "string")
+          .add("city", "string")
+          .add("salary", "float")
+          .add("file", "string")
+        var qry: StreamingQuery = null
+        try {
+          val readSocketDF = spark.readStream
+            .format("csv")
+            .option("sep", ",")
+            .schema(inputSchema)
+            .option("path", csvDataDir)
+            .option("header", "false")
+            .load()
+
+          // Write data from socket stream to carbondata file
+          qry = readSocketDF.writeStream
+            .format("carbondata")
+            .trigger(ProcessingTime(s"${ intervalSecond } seconds"))
+            .option("checkpointLocation", tablePath.getStreamingCheckpointDir)
+            .option("tablePath", tablePath.getPath)
+            .start()
+
+          qry.awaitTermination()
+        } catch {
+          case _: InterruptedException =>
+            println("Done reading and writing streaming data")
+        } finally {
+          if (qry != null) {
+            qry.stop()
+          }
+        }
+      }
+    }
+  }
+
+  def createTable(tableName: String, streaming: Boolean, withBatchLoad: Boolean): Unit = {
+    sql(
+      s"""
+         | CREATE TABLE streaming.$tableName(
+         | id INT,
+         | name STRING,
+         | city STRING,
+         | salary FLOAT
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES(${if (streaming) "'streaming'='true', " else "" }
+         | 'sort_columns'='name', 'dictionary_include'='city')
+         | """.stripMargin)
+
+    if (withBatchLoad) {
+      // batch loading 5 rows
+      executeBatchLoad(tableName)
+    }
+  }
+
+  def createTableWithComplexType(
+      tableName: String,
+      streaming: Boolean,
+      withBatchLoad: Boolean): Unit = {
+    sql(
+      s"""
+         | CREATE TABLE streaming.$tableName(
+         | id INT,
+         | name STRING,
+         | city STRING,
+         | salary FLOAT,
+         | file struct<school:array<string>, age:int>
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES(${if (streaming) "'streaming'='true', " else "" }
+         | 'sort_columns'='name', 'dictionary_include'='city')
+         | """.stripMargin)
+
+    if (withBatchLoad) {
+      // batch loading 5 rows
+      executeBatchLoad(tableName)
+    }
+  }
+
+  def executeBatchLoad(tableName: String): Unit = {
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '$dataFilePath'
+         | INTO TABLE streaming.$tableName
+         | OPTIONS('HEADER'='true')
+         """.stripMargin)
+  }
+
+  def wrap(array: Array[String]) = {
+    new mutable.WrappedArray.ofRef(array)
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/40c31e80/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
index 53add22..45fcbb0 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
@@ -478,6 +478,12 @@ public final class CarbonDataMergerUtil {
       Date segDate1 = null;
       SimpleDateFormat sdf = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP);
       for (LoadMetadataDetails segment : listOfSegmentsBelowThresholdSize) {
+        // compaction should skip streaming segments
+        if (CarbonCommonConstants.STORE_LOADSTATUS_STREAMING.equals(segment.getLoadStatus()) ||
+            CarbonCommonConstants.STORE_LOADSTATUS_STREAMING_FINISH.equals(
+                segment.getLoadStatus())) {
+          continue;
+        }
 
         if (first) {
           segDate1 = initializeFirstSegment(loadsOfSameDate, segment, sdf);
@@ -506,7 +512,15 @@ public final class CarbonDataMergerUtil {
         }
       }
     } else {
-      return listOfSegmentsBelowThresholdSize;
+      for (LoadMetadataDetails segment : listOfSegmentsBelowThresholdSize) {
+        // compaction should skip streaming segments
+        if (CarbonCommonConstants.STORE_LOADSTATUS_STREAMING.equals(segment.getLoadStatus()) ||
+            CarbonCommonConstants.STORE_LOADSTATUS_STREAMING_FINISH.equals(
+                segment.getLoadStatus())) {
+          continue;
+        }
+        loadsOfSameDate.add(segment);
+      }
     }
 
     return loadsOfSameDate;
@@ -583,6 +597,11 @@ public final class CarbonDataMergerUtil {
 
     // check size of each segment , sum it up across partitions
     for (LoadMetadataDetails segment : listOfSegmentsAfterPreserve) {
+      // compaction should skip streaming segments
+      if (CarbonCommonConstants.STORE_LOADSTATUS_STREAMING.equals(segment.getLoadStatus()) ||
+          CarbonCommonConstants.STORE_LOADSTATUS_STREAMING_FINISH.equals(segment.getLoadStatus())) {
+        continue;
+      }
 
       String segId = segment.getLoadName();
       // variable to store one  segment size across partition.
@@ -688,7 +707,11 @@ public final class CarbonDataMergerUtil {
 
     // check size of each segment , sum it up across partitions
     for (LoadMetadataDetails segment : listOfSegmentsAfterPreserve) {
-
+      // compaction should skip streaming segments
+      if (CarbonCommonConstants.STORE_LOADSTATUS_STREAMING.equals(segment.getLoadStatus()) ||
+          CarbonCommonConstants.STORE_LOADSTATUS_STREAMING_FINISH.equals(segment.getLoadStatus())) {
+        continue;
+      }
       String segName = segment.getLoadName();
 
       // if a segment is already merged 2 levels then it s name will become .2

http://git-wip-us.apache.org/repos/asf/carbondata/blob/40c31e80/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
index 32ba332..d65f24d 100644
--- a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
@@ -53,8 +53,6 @@ public class StreamSegment {
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(StreamSegment.class.getName());
 
-  public static final long STREAM_SEGMENT_MAX_SIZE = 1024L * 1024 * 1024;
-
   /**
    * get stream segment or create new stream segment if not exists
    */
@@ -136,7 +134,8 @@ public class StreamSegment {
             "Acquired lock for table" + table.getDatabaseName() + "." + table.getFactTableName()
                 + " for stream table finish segment");
 
-        LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(tablePath.getPath());
+        LoadMetadataDetails[] details =
+            SegmentStatusManager.readLoadMetadata(tablePath.getMetadataDirectoryPath());
         for (LoadMetadataDetails detail : details) {
           if (segmentId.equals(detail.getLoadName())) {
             detail.setLoadEndTime(System.currentTimeMillis());
@@ -187,7 +186,7 @@ public class StreamSegment {
       TaskAttemptContext job) throws Exception {
     CarbonStreamRecordWriter writer = null;
     try {
-      writer = (CarbonStreamRecordWriter)new CarbonStreamOutputFormat().getRecordWriter(job);
+      writer = (CarbonStreamRecordWriter) new CarbonStreamOutputFormat().getRecordWriter(job);
       // at the begin of each task, should recover file if necessary
       // here can reuse some information of record writer
       recoverFileIfRequired(
@@ -199,6 +198,12 @@ public class StreamSegment {
         writer.write(null, inputIterators.next());
       }
       inputIterators.close();
+    } catch (Exception ex) {
+      if (writer != null) {
+        LOGGER.error(ex, "Failed to append batch data to stream segment: " +
+            writer.getSegmentDir());
+      }
+      throw ex;
     } finally {
       if (writer != null) {
         writer.close(job);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/40c31e80/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
index 3ac19d9..31ed1f6 100644
--- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
@@ -23,7 +23,6 @@ import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.execution.streaming.{CarbonAppendableStreamSink, Sink}
 
-import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.dictionary.server.DictionaryServer
@@ -31,6 +30,7 @@ import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.hadoop.streaming.CarbonStreamOutputFormat
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.spark.util.DataLoadingUtil
 import org.apache.carbondata.streaming.segment.StreamSegment
@@ -76,7 +76,21 @@ object StreamSinkFactory {
   }
 
   private def validateParameters(parameters: Map[String, String]): Unit = {
-    // TODO require to validate parameters
+    val segmentSize = parameters.get(CarbonStreamOutputFormat.HANDOFF_SIZE)
+    if (segmentSize.isDefined) {
+      try {
+        val value = java.lang.Long.parseLong(segmentSize.get)
+        if (value < CarbonStreamOutputFormat.HANDOFF_SIZE_MIN) {
+          new CarbonStreamException(CarbonStreamOutputFormat.HANDOFF_SIZE +
+                                    "should be bigger than or equal " +
+                                    CarbonStreamOutputFormat.HANDOFF_SIZE_MIN)
+        }
+      } catch {
+        case ex: NumberFormatException =>
+          new CarbonStreamException(CarbonStreamOutputFormat.HANDOFF_SIZE +
+                                    s" $segmentSize is an illegal number")
+      }
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/40c31e80/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
index 844423a..b3f0964 100644
--- a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
+++ b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
@@ -68,6 +68,10 @@ class CarbonAppendableStreamSink(
     }
     conf
   }
+  // segment max size(byte)
+  private val segmentMaxSize = hadoopConf.getLong(
+    CarbonStreamOutputFormat.HANDOFF_SIZE,
+    CarbonStreamOutputFormat.HANDOFF_SIZE_DEFAULT)
 
   override def addBatch(batchId: Long, data: DataFrame): Unit = {
     if (batchId <= fileLog.getLatest().map(_._1).getOrElse(-1L)) {
@@ -107,8 +111,7 @@ class CarbonAppendableStreamSink(
   private def checkOrHandOffSegment(): Unit = {
     val segmentDir = carbonTablePath.getSegmentDir("0", currentSegmentId)
     val fileType = FileFactory.getFileType(segmentDir)
-    if (StreamSegment.STREAM_SEGMENT_MAX_SIZE <=
-        StreamSegment.size(segmentDir)) {
+    if (segmentMaxSize <= StreamSegment.size(segmentDir)) {
       val newSegmentId =
         StreamSegment.close(carbonTable, currentSegmentId)
       currentSegmentId = newSegmentId
@@ -238,6 +241,7 @@ object CarbonAppendableStreamSink {
     val taskAttemptContext: TaskAttemptContext = {
       // Set up the configuration object
       val hadoopConf = description.serializableHadoopConf.value
+      CarbonStreamOutputFormat.setSegmentId(hadoopConf, description.segmentId)
       hadoopConf.set("mapred.job.id", jobId.toString)
       hadoopConf.set("mapred.tip.id", taskAttemptId.getTaskID.toString)
       hadoopConf.set("mapred.task.id", taskAttemptId.toString)