You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/11/13 11:10:37 UTC
carbondata git commit: [CARBONDATA-1572][Streaming] Add test case for
streaming ingest
Repository: carbondata
Updated Branches:
refs/heads/master 80195da41 -> 40c31e804
[CARBONDATA-1572][Streaming] Add test case for streaming ingest
Add test case for streaming ingest using socket stream
This closes #1485
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/40c31e80
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/40c31e80
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/40c31e80
Branch: refs/heads/master
Commit: 40c31e8049784a53594896cd2e37cd2fb5135fbc
Parents: 80195da
Author: QiangCai <qi...@qq.com>
Authored: Fri Nov 10 19:57:17 2017 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Mon Nov 13 19:10:21 2017 +0800
----------------------------------------------------------------------
.../core/constants/CarbonCommonConstants.java | 5 +
.../core/statusmanager/LoadMetadataDetails.java | 6 +
.../store/impl/FileFactoryImplUnitTest.java | 44 ++
.../streaming/CarbonStreamOutputFormat.java | 28 +
.../streaming/CarbonStreamRecordWriter.java | 17 +-
.../streaming/CarbonStreamInputFormatTest.java | 97 +++
.../streaming/CarbonStreamOutputFormatTest.java | 119 +++
.../hadoop/test/util/StoreCreator.java | 18 +-
.../src/test/resources/streamSample.csv | 6 +
.../org/apache/carbondata/api/CarbonStore.scala | 45 +-
.../carbondata/spark/rdd/CarbonMergerRDD.scala | 10 +-
.../org/apache/spark/sql/CarbonSource.scala | 5 +
.../TestStreamingTableOperation.scala | 740 ++++++++++++++++++-
.../processing/merger/CarbonDataMergerUtil.java | 27 +-
.../streaming/segment/StreamSegment.java | 13 +-
.../streaming/StreamSinkFactory.scala | 18 +-
.../streaming/CarbonAppendableStreamSink.scala | 8 +-
17 files changed, 1161 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/40c31e80/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 5f63cc1..014478f 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -810,6 +810,11 @@ public final class CarbonCommonConstants {
public static final String INVALID_SEGMENT_ID = "-1";
/**
+ * default load time of the segment
+ */
+ public static final long SEGMENT_LOAD_TIME_DEFAULT = -1;
+
+ /**
* Size of Major Compaction in MBs
*/
@CarbonProperty
http://git-wip-us.apache.org/repos/asf/carbondata/blob/40c31e80/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
index 7748d17..51a04e4 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
@@ -84,6 +84,9 @@ public class LoadMetadataDetails implements Serializable {
}
public long getLoadEndTime() {
+ if (timestamp == null) {
+ return CarbonCommonConstants.SEGMENT_LOAD_TIME_DEFAULT;
+ }
return convertTimeStampToLong(timestamp);
}
@@ -161,6 +164,9 @@ public class LoadMetadataDetails implements Serializable {
* @return the startLoadTime
*/
public long getLoadStartTime() {
+ if (loadStartTime == null) {
+ return CarbonCommonConstants.SEGMENT_LOAD_TIME_DEFAULT;
+ }
return convertTimeStampToLong(loadStartTime);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/40c31e80/core/src/test/java/org/apache/carbondata/core/carbon/datastorage/filesystem/store/impl/FileFactoryImplUnitTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/carbon/datastorage/filesystem/store/impl/FileFactoryImplUnitTest.java b/core/src/test/java/org/apache/carbondata/core/carbon/datastorage/filesystem/store/impl/FileFactoryImplUnitTest.java
index 6fd27d3..65590d6 100644
--- a/core/src/test/java/org/apache/carbondata/core/carbon/datastorage/filesystem/store/impl/FileFactoryImplUnitTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/carbon/datastorage/filesystem/store/impl/FileFactoryImplUnitTest.java
@@ -18,8 +18,10 @@
package org.apache.carbondata.core.carbon.datastorage.filesystem.store.impl;
import java.io.File;
+import java.io.FileWriter;
import java.io.IOException;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import mockit.Mock;
@@ -147,5 +149,47 @@ public class FileFactoryImplUnitTest {
FileFactory.getDataOutputStream(filePath, FileFactory.FileType.VIEWFS);
assertNotNull(FileFactory.getCarbonFile(filePath, FileFactory.FileType.HDFS));
}
+
+ @Test public void testTruncateFile() {
+ FileWriter writer = null;
+ String path = null;
+ try {
+ // generate a file
+ path = new File("target/truncatFile").getCanonicalPath();
+ writer = new FileWriter(path);
+ for (int i = 0; i < 4000; i++) {
+ writer.write("test truncate file method");
+ }
+ writer.close();
+ CarbonFile file = FileFactory.getCarbonFile(path);
+ assertTrue(file.getSize() == 100000L);
+
+ // truncate file to 4000 bytes
+ FileFactory.truncateFile(
+ path,
+ FileFactory.getFileType(path),
+ 4000);
+ file = FileFactory.getCarbonFile(path);
+ assertEquals(file.getSize(), 4000L);
+ } catch (IOException e) {
+ e.printStackTrace();
+ assertTrue(e.getMessage(), false);
+ } finally {
+ if (writer != null) {
+ try {
+ writer.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ if (path != null) {
+ try {
+ FileFactory.deleteFile(path, FileFactory.getFileType(path));
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/40c31e80/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormat.java
index 1c21504..47b43c4 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormat.java
@@ -51,6 +51,24 @@ public class CarbonStreamOutputFormat extends FileOutputFormat<Void, Object> {
private static final String LOAD_Model = "mapreduce.output.carbon.load.model";
+ private static final String SEGMENT_ID = "carbon.segment.id";
+
+ /**
+ * if the byte size of streaming segment reach this value,
+ * the system will create a new stream segment
+ */
+ public static final String HANDOFF_SIZE = "carbon.handoff.size";
+
+ /**
+ * the min handoff size of streaming segment, the unit is byte
+ */
+ public static final long HANDOFF_SIZE_MIN = 1024L * 1024 * 64;
+
+ /**
+ * the default handoff size of streaming segment, the unit is byte
+ */
+ public static final long HANDOFF_SIZE_DEFAULT = 1024L * 1024 * 1024;
+
@Override public RecordWriter<Void, Object> getRecordWriter(TaskAttemptContext job)
throws IOException, InterruptedException {
return new CarbonStreamRecordWriter(job);
@@ -72,4 +90,14 @@ public class CarbonStreamOutputFormat extends FileOutputFormat<Void, Object> {
}
}
+ public static void setSegmentId(Configuration hadoopConf, String segmentId) throws IOException {
+ if (segmentId != null) {
+ hadoopConf.set(SEGMENT_ID, segmentId);
+ }
+ }
+
+ public static String getSegmentId(Configuration hadoopConf) throws IOException {
+ return hadoopConf.get(SEGMENT_ID);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/40c31e80/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java
index 3c4b55c..7df87e3 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java
@@ -75,6 +75,7 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> {
// parser and converter
private RowParser rowParser;
+ private BadRecordsLogger badRecordLogger;
private RowConverter converter;
private CarbonRow currentRow = new CarbonRow(null);
@@ -106,6 +107,8 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> {
throw new IOException(
"CarbonStreamRecordWriter require configuration: mapreduce.output.carbon.load.model");
}
+ String segmentId = CarbonStreamOutputFormat.getSegmentId(hadoopConf);
+ carbonLoadModel.setSegmentId(segmentId);
carbonTable = carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable();
int taskNo = TaskID.forName(hadoopConf.get("mapred.tip.id")).getId();
carbonLoadModel.setTaskNo("" + taskNo);
@@ -117,7 +120,7 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> {
CarbonTablePath tablePath =
CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier());
- segmentDir = tablePath.getSegmentDir("0", carbonLoadModel.getSegmentId());
+ segmentDir = tablePath.getSegmentDir("0", segmentId);
fileName = CarbonTablePath.getCarbonDataFileName(0, taskNo, 0, 0, "0");
}
@@ -138,8 +141,7 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> {
// initialize parser and converter
rowParser = new RowParserImpl(dataFields, configuration);
- BadRecordsLogger badRecordLogger =
- DataConverterProcessorStepImpl.createBadRecordLogger(configuration);
+ badRecordLogger = DataConverterProcessorStepImpl.createBadRecordLogger(configuration);
converter = new RowConverterImpl(configuration.getDataFields(), configuration, badRecordLogger);
configuration.setCardinalityFinder(converter);
converter.initialize();
@@ -285,14 +287,19 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> {
@Override public void close(TaskAttemptContext context) throws IOException, InterruptedException {
try {
// append remain buffer data
- if (!hasException) {
+ if (!hasException && !isFirstRow) {
appendBlockletToDataFile();
converter.finish();
}
} finally {
// close resource
CarbonUtil.closeStreams(outputStream);
- output.close();
+ if (output != null) {
+ output.close();
+ }
+ if (badRecordLogger != null) {
+ badRecordLogger.closeStreams();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/40c31e80/hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormatTest.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormatTest.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormatTest.java
new file mode 100644
index 0000000..9970c50
--- /dev/null
+++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormatTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.hadoop.streaming;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.statusmanager.FileFormat;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
+
+import junit.framework.TestCase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.spark.SparkHadoopWriter;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class CarbonStreamInputFormatTest extends TestCase {
+
+ private TaskAttemptID taskAttemptId;
+ private TaskAttemptContext taskAttemptContext;
+ private Configuration hadoopConf;
+ private AbsoluteTableIdentifier identifier;
+ private String storePath;
+
+ @Override protected void setUp() throws Exception {
+ storePath = new File("target/stream_input").getCanonicalPath();
+ String dbName = "default";
+ String tableName = "stream_table_input";
+ identifier = new AbsoluteTableIdentifier(storePath,
+ new CarbonTableIdentifier(dbName, tableName, UUID.randomUUID().toString()));
+
+ JobID jobId = SparkHadoopWriter.createJobID(new Date(), 0);
+ TaskID taskId = new TaskID(jobId, TaskType.MAP, 0);
+ taskAttemptId = new TaskAttemptID(taskId, 0);
+
+ hadoopConf = new Configuration();
+ taskAttemptContext = new TaskAttemptContextImpl(hadoopConf, taskAttemptId);
+ }
+
+ private InputSplit buildInputSplit() throws IOException {
+ CarbonInputSplit carbonInputSplit = new CarbonInputSplit();
+ List<CarbonInputSplit> splitList = new ArrayList<>();
+ splitList.add(carbonInputSplit);
+ return new CarbonMultiBlockSplit(identifier, splitList, new String[] { "localhost" },
+ FileFormat.rowformat);
+ }
+
+ @Test public void testCreateRecordReader() {
+ try {
+ InputSplit inputSplit = buildInputSplit();
+ CarbonStreamInputFormat inputFormat = new CarbonStreamInputFormat();
+ RecordReader recordReader = inputFormat.createRecordReader(inputSplit, taskAttemptContext);
+ Assert.assertNotNull("Failed to create record reader", recordReader);
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.assertTrue(e.getMessage(), false);
+ }
+ }
+
+ @Override protected void tearDown() throws Exception {
+ super.tearDown();
+ if (storePath != null) {
+ FileFactory.deleteAllFilesOfDir(new File(storePath));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/40c31e80/hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormatTest.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormatTest.java
new file mode 100644
index 0000000..daa2540
--- /dev/null
+++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormatTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.hadoop.streaming;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Date;
+import java.util.UUID;
+
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.hadoop.test.util.StoreCreator;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
+
+import junit.framework.TestCase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.spark.SparkHadoopWriter;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class CarbonStreamOutputFormatTest extends TestCase {
+
+ private Configuration hadoopConf;
+ private TaskAttemptID taskAttemptId;
+ private CarbonLoadModel carbonLoadModel;
+ private String storePath;
+
+ @Override protected void setUp() throws Exception {
+ super.setUp();
+ JobID jobId = SparkHadoopWriter.createJobID(new Date(), 0);
+ TaskID taskId = new TaskID(jobId, TaskType.MAP, 0);
+ taskAttemptId = new TaskAttemptID(taskId, 0);
+
+ hadoopConf = new Configuration();
+ hadoopConf.set("mapred.job.id", jobId.toString());
+ hadoopConf.set("mapred.tip.id", taskAttemptId.getTaskID().toString());
+ hadoopConf.set("mapred.task.id", taskAttemptId.toString());
+ hadoopConf.setBoolean("mapred.task.is.map", true);
+ hadoopConf.setInt("mapred.task.partition", 0);
+
+ storePath = new File("target/stream_output").getCanonicalPath();
+ String dbName = "default";
+ String tableName = "stream_table_output";
+ AbsoluteTableIdentifier identifier = new AbsoluteTableIdentifier(storePath,
+ new CarbonTableIdentifier(dbName, tableName, UUID.randomUUID().toString()));
+
+ CarbonTable table = StoreCreator.createTable(identifier);
+
+ String factFilePath = new File("../hadoop/src/test/resources/data.csv").getCanonicalPath();
+ carbonLoadModel = StoreCreator.buildCarbonLoadModel(table, factFilePath, identifier);
+ }
+
+ @Test public void testSetCarbonLoadModel() {
+ try {
+ CarbonStreamOutputFormat.setCarbonLoadModel(hadoopConf, carbonLoadModel);
+ } catch (IOException e) {
+ Assert.assertTrue("Failed to config CarbonLoadModel for CarbonStreamOutputFromat", false);
+ }
+ }
+
+ @Test public void testGetCarbonLoadModel() {
+ try {
+ CarbonStreamOutputFormat.setCarbonLoadModel(hadoopConf, carbonLoadModel);
+ CarbonLoadModel model = CarbonStreamOutputFormat.getCarbonLoadModel(hadoopConf);
+
+ Assert.assertNotNull("Failed to get CarbonLoadModel", model);
+ Assert.assertTrue("CarbonLoadModel should be same with previous",
+ carbonLoadModel.getFactTimeStamp() == model.getFactTimeStamp());
+
+ } catch (IOException e) {
+ Assert.assertTrue("Failed to get CarbonLoadModel for CarbonStreamOutputFromat", false);
+ }
+ }
+
+ @Test public void testGetRecordWriter() {
+ CarbonStreamOutputFormat outputFormat = new CarbonStreamOutputFormat();
+ try {
+ CarbonStreamOutputFormat.setCarbonLoadModel(hadoopConf, carbonLoadModel);
+ TaskAttemptContext taskAttemptContext =
+ new TaskAttemptContextImpl(hadoopConf, taskAttemptId);
+ RecordWriter recordWriter = outputFormat.getRecordWriter(taskAttemptContext);
+ Assert.assertNotNull("Failed to get CarbonStreamRecordWriter", recordWriter);
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.assertTrue(e.getMessage(), false);
+ }
+ }
+
+ @Override protected void tearDown() throws Exception {
+ super.tearDown();
+ if (storePath != null) {
+ FileFactory.deleteAllFilesOfDir(new File(storePath));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/40c31e80/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
index 29d8d03..dd48cd9 100644
--- a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
+++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
@@ -49,7 +49,6 @@ import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
import org.apache.carbondata.core.metadata.ColumnIdentifier;
import org.apache.carbondata.core.metadata.converter.SchemaConverter;
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
-import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.metadata.encoder.Encoding;
import org.apache.carbondata.core.metadata.schema.SchemaEvolution;
@@ -116,7 +115,8 @@ public class StoreCreator {
return absoluteTableIdentifier;
}
- public static CarbonLoadModel buildCarbonLoadModel(CarbonTable table, String factFilePath) {
+ public static CarbonLoadModel buildCarbonLoadModel(CarbonTable table, String factFilePath,
+ AbsoluteTableIdentifier absoluteTableIdentifier) {
CarbonDataLoadSchema schema = new CarbonDataLoadSchema(table);
CarbonLoadModel loadModel = new CarbonLoadModel();
loadModel.setCarbonDataLoadSchema(schema);
@@ -159,28 +159,28 @@ public class StoreCreator {
* Create store without any restructure
*/
public static void createCarbonStore() {
-
try {
-
- String factFilePath = new File("../hadoop/src/test/resources/data.csv").getCanonicalPath();
+ String factFilePath =
+ new File("../hadoop/src/test/resources/data.csv").getCanonicalPath();
File storeDir = new File(absoluteTableIdentifier.getStorePath());
CarbonUtil.deleteFoldersAndFiles(storeDir);
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.STORE_LOCATION_HDFS,
absoluteTableIdentifier.getStorePath());
- CarbonTable table = createTable();
+ CarbonTable table = createTable(absoluteTableIdentifier);
writeDictionary(factFilePath, table);
- CarbonLoadModel loadModel = buildCarbonLoadModel(table, factFilePath);
+ CarbonLoadModel loadModel =
+ buildCarbonLoadModel(table, factFilePath, absoluteTableIdentifier);
executeGraph(loadModel, absoluteTableIdentifier.getStorePath());
} catch (Exception e) {
e.printStackTrace();
}
-
}
- public static CarbonTable createTable() throws IOException {
+ public static CarbonTable createTable(
+ AbsoluteTableIdentifier absoluteTableIdentifier) throws IOException {
TableInfo tableInfo = new TableInfo();
tableInfo.setStorePath(absoluteTableIdentifier.getStorePath());
tableInfo.setDatabaseName(absoluteTableIdentifier.getCarbonTableIdentifier().getDatabaseName());
http://git-wip-us.apache.org/repos/asf/carbondata/blob/40c31e80/integration/spark-common-test/src/test/resources/streamSample.csv
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/streamSample.csv b/integration/spark-common-test/src/test/resources/streamSample.csv
new file mode 100644
index 0000000..590ea90
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/streamSample.csv
@@ -0,0 +1,6 @@
+id,name,city,salary,file
+100000001,batch_1,city_1,0.1,school_1:school_11$20
+100000002,batch_2,city_2,0.2,school_2:school_22$30
+100000003,batch_3,city_3,0.3,school_3:school_33$40
+100000004,batch_4,city_4,0.4,school_4:school_44$50
+100000005,batch_5,city_5,0.5,school_5:school_55$60
http://git-wip-us.apache.org/repos/asf/carbondata/blob/40c31e80/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
index a95bc01..054f778 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
@@ -64,21 +64,36 @@ object CarbonStore {
}
loadMetadataDetailsSortedArray
- .filter(_.getVisibility.equalsIgnoreCase("true"))
- .map { load =>
- val mergedTo = if (load.getMergedLoadName != null) {
- load.getMergedLoadName
- } else {
- ""
- }
- Row(
- load.getLoadName,
- load.getLoadStatus,
- new java.sql.Timestamp(load.getLoadStartTime),
- new java.sql.Timestamp(load.getLoadEndTime),
- mergedTo
- )
- }.toSeq
+ .filter(_.getVisibility.equalsIgnoreCase("true"))
+ .map { load =>
+ val mergedTo = if (load.getMergedLoadName != null) {
+ load.getMergedLoadName
+ } else {
+ ""
+ }
+
+ val startTime =
+ if (load.getLoadStartTime == CarbonCommonConstants.SEGMENT_LOAD_TIME_DEFAULT) {
+ null
+ } else {
+ new java.sql.Timestamp(load.getLoadStartTime)
+ }
+
+ val endTime =
+ if (load.getLoadEndTime == CarbonCommonConstants.SEGMENT_LOAD_TIME_DEFAULT) {
+ null
+ } else {
+ new java.sql.Timestamp(load.getLoadEndTime)
+ }
+
+ Row(
+ load.getLoadName,
+ load.getLoadStatus,
+ startTime,
+ endTime,
+ mergedTo
+ )
+ }.toSeq
} else {
Seq.empty
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/40c31e80/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index 1f88f25..c6edc2a 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -40,7 +40,7 @@ import org.apache.carbondata.core.metadata.blocklet.DataFileFooter
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
import org.apache.carbondata.core.mutate.UpdateVO
import org.apache.carbondata.core.scan.result.iterator.RawResultIterator
-import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager
+import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentUpdateStatusManager}
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataTypeUtil}
import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit}
import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
@@ -307,7 +307,8 @@ class CarbonMergerRDD[K, V](
// keep on assigning till last one is reached.
if (null != splits && splits.size > 0) {
- splitsOfLastSegment = splits.asScala.map(_.asInstanceOf[CarbonInputSplit]).toList.asJava
+ splitsOfLastSegment = splits.asScala.map(_.asInstanceOf[CarbonInputSplit])
+ .filter { split => FileFormat.carbondata.equals(split.getFileFormat) }.toList.asJava
}
carbonInputSplits ++:= splits.asScala.map(_.asInstanceOf[CarbonInputSplit]).filter(entry => {
@@ -316,9 +317,10 @@ class CarbonMergerRDD[K, V](
entry.getLocations, entry.getLength, entry.getVersion,
updateStatusManager.getDeleteDeltaFilePath(entry.getPath.toString)
)
- ((!updated) || ((updated) && (!CarbonUtil
+ (((!updated) || ((updated) && (!CarbonUtil
.isInvalidTableBlock(blockInfo.getSegmentId, blockInfo.getFilePath,
- updateDetails, updateStatusManager))))
+ updateDetails, updateStatusManager)))) &&
+ FileFormat.carbondata.equals(entry.getFileFormat))
})
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/40c31e80/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index 4580f22..de01c8d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -230,6 +230,11 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
CarbonEnv.getInstance(sparkSession).carbonMetastore.
createCarbonRelation(parameters, identifier, sparkSession).tableMeta.carbonTable
+ if (!carbonTable.isStreamingTable) {
+ throw new CarbonStreamException(s"Table ${carbonTable.getDatabaseName}." +
+ s"${carbonTable.getFactTableName} is not a streaming table")
+ }
+
// create sink
StreamSinkFactory.createStreamTableSink(
sqlContext.sparkSession,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/40c31e80/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index d5f9426..ffa977b 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -17,15 +17,30 @@
package org.apache.spark.carbondata
-import org.apache.spark.sql.catalyst.parser.ParseException
+import java.io.{File, PrintWriter}
+import java.net.ServerSocket
+import java.util.concurrent.Executors
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.{CarbonEnv, Row, SaveMode, SparkSession}
+import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
import org.apache.spark.sql.test.util.QueryTest
+import org.apache.spark.sql.types.StructType
import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.hadoop.streaming.CarbonStreamOutputFormat
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
+
+ private val spark = sqlContext.sparkSession
+ private val dataFilePath = s"$resourcesPath/streamSample.csv"
+
override def beforeAll {
sql("DROP DATABASE IF EXISTS streaming CASCADE")
sql("CREATE DATABASE streaming")
@@ -41,6 +56,52 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
| TBLPROPERTIES ('streaming' = 'true')
""".stripMargin)
sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO TABLE source""")
+
+ dropTable()
+
+ // 1. normal table not support streaming ingest
+ createTable(tableName = "batch_table", streaming = false, withBatchLoad = true)
+
+ // 2. streaming table with different input source
+ // socket source
+ createTable(tableName = "stream_table_socket", streaming = true, withBatchLoad = true)
+ // file source
+ createTable(tableName = "stream_table_file", streaming = true, withBatchLoad = true)
+
+ // 3. streaming table with bad records
+ createTable(tableName = "bad_record_force", streaming = true, withBatchLoad = true)
+ createTable(tableName = "bad_record_fail", streaming = true, withBatchLoad = true)
+
+ // 4. streaming frequency check
+ createTable(tableName = "stream_table_1s", streaming = true, withBatchLoad = true)
+ createTable(tableName = "stream_table_10s", streaming = true, withBatchLoad = true)
+
+ // 5. streaming table execute batch loading
+ createTable(tableName = "stream_table_batch", streaming = true, withBatchLoad = false)
+
+ // 6. detail query
+ // full scan
+ createTable(tableName = "stream_table_scan", streaming = true, withBatchLoad = true)
+ createTableWithComplexType(
+ tableName = "stream_table_scan_complex", streaming = true, withBatchLoad = true)
+ // filter scan
+ createTable(tableName = "stream_table_filter", streaming = true, withBatchLoad = true)
+ createTableWithComplexType(
+ tableName = "stream_table_filter_complex", streaming = true, withBatchLoad = true)
+
+ // 7. aggregate query
+ createTable(tableName = "stream_table_agg", streaming = true, withBatchLoad = true)
+ createTableWithComplexType(
+ tableName = "stream_table_agg_complex", streaming = true, withBatchLoad = true)
+
+ // 8. compaction
+ createTable(tableName = "stream_table_compact", streaming = true, withBatchLoad = true)
+
+ // 9. create new stream segment if current stream segment is full
+ createTable(tableName = "stream_table_new", streaming = true, withBatchLoad = true)
+
+ // 10. fault tolerant
+ createTable(tableName = "stream_table_tolerant", streaming = true, withBatchLoad = true)
}
test("validate streaming property") {
@@ -96,7 +157,682 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
}
override def afterAll {
+ dropTable()
sql("USE default")
sql("DROP DATABASE IF EXISTS streaming CASCADE")
}
+
+ def dropTable(): Unit = {
+ sql("drop table if exists streaming.batch_table")
+ sql("drop table if exists streaming.stream_table_socket")
+ sql("drop table if exists streaming.stream_table_file")
+ sql("drop table if exists streaming.bad_record_force")
+ sql("drop table if exists streaming.bad_record_fail")
+ sql("drop table if exists streaming.stream_table_1s")
+ sql("drop table if exists streaming.stream_table_10s")
+ sql("drop table if exists streaming.stream_table_batch")
+ sql("drop table if exists streaming.stream_table_scan")
+ sql("drop table if exists streaming.stream_table_scan_complex")
+ sql("drop table if exists streaming.stream_table_filter")
+ sql("drop table if exists streaming.stream_table_filter_complex")
+ sql("drop table if exists streaming.stream_table_agg")
+ sql("drop table if exists streaming.stream_table_agg_complex")
+ sql("drop table if exists streaming.stream_table_compact")
+ sql("drop table if exists streaming.stream_table_new")
+ sql("drop table if exists streaming.stream_table_tolerant")
+ }
+
+ // normal table not support streaming ingest
+ test("normal table not support streaming ingest") {
+ val identifier = new TableIdentifier("batch_table", Option("streaming"))
+ val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark)
+ .asInstanceOf[CarbonRelation].metaData.carbonTable
+ val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
+ var server: ServerSocket = null
+ try {
+ server = new ServerSocket(7071)
+ val thread1 = createWriteSocketThread(server, 2, 10, 1)
+ thread1.start()
+ // use thread pool to catch the exception of sink thread
+ val pool = Executors.newSingleThreadExecutor()
+ val thread2 = createSocketStreamingThread(spark, tablePath)
+ val future = pool.submit(thread2)
+ Thread.sleep(1000)
+ thread1.interrupt()
+ try {
+ future.get()
+ assert(false)
+ } catch {
+ case ex =>
+ assert(ex.getMessage.contains("is not a streaming table"))
+ }
+ } finally {
+ if (server != null) {
+ server.close()
+ }
+ }
+ }
+
+ // input source: socket
+ test("streaming ingest from socket source") {
+ executeStreamingIngest(
+ tableName = "stream_table_socket",
+ batchNums = 2,
+ rowNumsEachBatch = 10,
+ intervalOfSource = 1,
+ intervalOfIngest = 1,
+ continueSeconds = 10,
+ generateBadRecords = false,
+ badRecordAction = "force"
+ )
+
+ checkAnswer(
+ sql("select count(*) from streaming.stream_table_socket"),
+ Seq(Row(25))
+ )
+ }
+
+ // input source: file
+ test("streaming ingest from file source") {
+ val identifier = new TableIdentifier("stream_table_file", Option("streaming"))
+ val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark)
+ .asInstanceOf[CarbonRelation].metaData.carbonTable
+ val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
+ val csvDataDir = new File("target/csvdata").getCanonicalPath
+ // streaming ingest 10 rows
+ generateCSVDataFile(spark, idStart = 10, rowNums = 10, csvDataDir)
+ val thread = createFileStreamingThread(spark, tablePath, csvDataDir, intervalSecond = 1)
+ thread.start()
+ Thread.sleep(2000)
+ generateCSVDataFile(spark, idStart = 30, rowNums = 10, csvDataDir)
+ Thread.sleep(10000)
+ thread.interrupt()
+ checkAnswer(
+ sql("select count(*) from streaming.stream_table_file"),
+ Seq(Row(25))
+ )
+ }
+
+ // bad records
+ test("streaming table with bad records action: force") {
+ executeStreamingIngest(
+ tableName = "bad_record_force",
+ batchNums = 2,
+ rowNumsEachBatch = 10,
+ intervalOfSource = 1,
+ intervalOfIngest = 1,
+ continueSeconds = 10,
+ generateBadRecords = true,
+ badRecordAction = "force"
+ )
+ checkAnswer(
+ sql("select count(*) from streaming.stream_table_socket"),
+ Seq(Row(25))
+ )
+
+ }
+
+ test("streaming table with bad records action: fail") {
+ executeStreamingIngest(
+ tableName = "bad_record_fail",
+ batchNums = 2,
+ rowNumsEachBatch = 10,
+ intervalOfSource = 1,
+ intervalOfIngest = 1,
+ continueSeconds = 10,
+ generateBadRecords = true,
+ badRecordAction = "fail"
+ )
+ val result = sql("select count(*) from streaming.bad_record_fail").collect()
+ assert(result(0).getLong(0) < 25)
+ }
+
+ // ingest with different interval
+ test("1 row per 1 second interval") {
+ executeStreamingIngest(
+ tableName = "stream_table_1s",
+ batchNums = 20,
+ rowNumsEachBatch = 1,
+ intervalOfSource = 1,
+ intervalOfIngest = 1,
+ continueSeconds = 20,
+ generateBadRecords = false,
+ badRecordAction = "force"
+ )
+ val result = sql("select count(*) from streaming.stream_table_1s").collect()
+ // 20 seconds can't ingest all data, exists data delay
+ assert(result(0).getLong(0) > 5 + 10)
+ }
+
+ test("10000 row per 10 seconds interval") {
+ executeStreamingIngest(
+ tableName = "stream_table_10s",
+ batchNums = 5,
+ rowNumsEachBatch = 10000,
+ intervalOfSource = 5,
+ intervalOfIngest = 10,
+ continueSeconds = 50,
+ generateBadRecords = false,
+ badRecordAction = "force"
+ )
+ checkAnswer(
+ sql("select count(*) from streaming.stream_table_10s"),
+ Seq(Row(5 + 10000*5)))
+ }
+
+ // batch loading on streaming table
+ test("streaming table execute batch loading") {
+ executeStreamingIngest(
+ tableName = "stream_table_batch",
+ batchNums = 5,
+ rowNumsEachBatch = 100,
+ intervalOfSource = 3,
+ intervalOfIngest = 5,
+ continueSeconds = 20,
+ generateBadRecords = false,
+ badRecordAction = "force"
+ )
+ checkAnswer(
+ sql("select count(*) from streaming.stream_table_batch"),
+ Seq(Row(100*5)))
+
+ executeBatchLoad("stream_table_batch")
+
+ checkAnswer(
+ sql("select count(*) from streaming.stream_table_batch"),
+ Seq(Row(100*5 + 5)))
+ }
+
+ // detail query on batch and stream segment
+ test("non-filter query on stream table with dictionary, sort_columns") {
+ executeStreamingIngest(
+ tableName = "stream_table_scan",
+ batchNums = 5,
+ rowNumsEachBatch = 10,
+ intervalOfSource = 2,
+ intervalOfIngest = 4,
+ continueSeconds = 20,
+ generateBadRecords = false,
+ badRecordAction = "force"
+ )
+
+ val result = sql("select * from streaming.stream_table_scan order by id").collect()
+ assert(result != null)
+ assert(result.length == 55)
+ // check one row of streaming data
+ assert(result(0).getInt(0) == 1)
+ assert(result(0).getString(1) == "name_1")
+ // check one row of batch loading
+ assert(result(50).getInt(0) == 100000001)
+ assert(result(50).getString(1) == "batch_1")
+ }
+
+ test("non-filter query on stream table with dictionary, sort_columns and complex column") {
+ executeStreamingIngest(
+ tableName = "stream_table_scan_complex",
+ batchNums = 5,
+ rowNumsEachBatch = 10,
+ intervalOfSource = 2,
+ intervalOfIngest = 4,
+ continueSeconds = 20,
+ generateBadRecords = false,
+ badRecordAction = "force"
+ )
+
+ val result = sql("select * from streaming.stream_table_scan_complex order by id").collect()
+ assert(result != null)
+ assert(result.length == 55)
+ // check one row of streaming data
+ assert(result(0).getInt(0) == 1)
+ assert(result(0).getString(1) == "name_1")
+ assert(result(0).getStruct(4).getInt(1) == 1)
+ // check one row of batch loading
+ assert(result(50).getInt(0) == 100000001)
+ assert(result(50).getString(1) == "batch_1")
+ assert(result(50).getStruct(4).getInt(1) == 20)
+ }
+
+ test("filter query on stream table with dictionary, sort_columns") {
+ executeStreamingIngest(
+ tableName = "stream_table_filter",
+ batchNums = 5,
+ rowNumsEachBatch = 10,
+ intervalOfSource = 2,
+ intervalOfIngest = 4,
+ continueSeconds = 20,
+ generateBadRecords = true,
+ badRecordAction = "force"
+ )
+
+ checkAnswer(
+ sql("select * from stream_table_filter where id = 1"),
+ Seq(Row(1, "name_1", "city_1", 10000.0)))
+
+ checkAnswer(
+ sql("select * from stream_table_filter where name = 'name_2'"),
+ Seq(Row(2, "name_2", "", 20000.0)))
+
+ checkAnswer(
+ sql("select * from stream_table_filter where city = 'city_1'"),
+ Seq(Row(1, "name_1", "city_1", 10000.0),
+ Row(100000001, "batch_1", "city_1", 0.1)))
+
+ checkAnswer(
+ sql("select * from stream_table_filter where id > 49 and id < 100000002"),
+ Seq(Row(50, "name_50", "city_50", 500000.0),
+ Row(100000001, "batch_1", "city_1", 0.1)))
+
+ checkAnswer(
+ sql("select * from stream_table_filter where id is null"),
+ Seq(Row(null, "name_6", "city_6", 60000.0)))
+
+ checkAnswer(
+ sql("select * from stream_table_filter where city = ''"),
+ Seq(Row(2, "name_2", "", 20000.0)))
+
+ }
+
+ test("filter query on stream table with dictionary, sort_columns and complex column") {
+ executeStreamingIngest(
+ tableName = "stream_table_filter_complex",
+ batchNums = 5,
+ rowNumsEachBatch = 10,
+ intervalOfSource = 2,
+ intervalOfIngest = 4,
+ continueSeconds = 20,
+ generateBadRecords = true,
+ badRecordAction = "force"
+ )
+
+ checkAnswer(
+ sql("select * from stream_table_filter_complex where id = 1"),
+ Seq(Row(1, "name_1", "city_1", 10000.0, Row(wrap(Array("school_1", "school_11")), 1))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter_complex where name = 'name_2'"),
+ Seq(Row(2, "name_2", "", 20000.0, Row(wrap(Array("school_2", "school_22")), 2))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter_complex where file.age = 3"),
+ Seq(Row(3, "name_3", "city_3", 30000.0, Row(wrap(Array("school_3", "school_33")), 3))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter_complex where city = 'city_1'"),
+ Seq(Row(1, "name_1", "city_1", 10000.0, Row(wrap(Array("school_1", "school_11")), 1)),
+ Row(100000001, "batch_1", "city_1", 0.1, Row(wrap(Array("school_1", "school_11")), 20))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter_complex where id > 49 and id < 100000002"),
+ Seq(Row(50, "name_50", "city_50", 500000.0, Row(wrap(Array("school_50", "school_5050")), 50)),
+ Row(100000001, "batch_1", "city_1", 0.1, Row(wrap(Array("school_1", "school_11")), 20))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter_complex where id is null"),
+ Seq(Row(null, "name_6", "city_6", 60000.0, Row(wrap(Array("school_6", "school_66")), 6))))
+
+ checkAnswer(
+ sql("select * from stream_table_filter_complex where city = ''"),
+ Seq(Row(2, "name_2", "", 20000.0, Row(wrap(Array("school_2", "school_22")), 2))))
+
+ }
+
+ // aggregation
+ test("aggregation query") {
+ executeStreamingIngest(
+ tableName = "stream_table_agg",
+ batchNums = 5,
+ rowNumsEachBatch = 10,
+ intervalOfSource = 2,
+ intervalOfIngest = 4,
+ continueSeconds = 20,
+ generateBadRecords = true,
+ badRecordAction = "force"
+ )
+
+ checkAnswer(
+ sql("select count(*), max(id), min(name), cast(avg(id) as integer), sum(id) " +
+ "from stream_table_agg where id >= 2 and id <= 100000004"),
+ Seq(Row(52, 100000004, "batch_1", 7692332, 400001278)))
+
+ checkAnswer(
+ sql("select city, count(id), sum(id), cast(avg(id) as integer), " +
+ "max(salary), min(salary) " +
+ "from stream_table_agg " +
+ "where name in ('batch_1', 'batch_2', 'batch_3', 'name_1', 'name_2', 'name_3') " +
+ "and city <> '' " +
+ "group by city " +
+ "order by city"),
+ Seq(Row("city_1", 2, 100000002, 50000001, 10000.0, 0.1),
+ Row("city_2", 1, 100000002, 100000002, 0.2, 0.2),
+ Row("city_3", 2, 100000006, 50000003, 30000.0, 0.3)))
+ }
+
+ test("aggregation query with complex") {
+ executeStreamingIngest(
+ tableName = "stream_table_agg_complex",
+ batchNums = 5,
+ rowNumsEachBatch = 10,
+ intervalOfSource = 2,
+ intervalOfIngest = 4,
+ continueSeconds = 20,
+ generateBadRecords = true,
+ badRecordAction = "force"
+ )
+
+ checkAnswer(
+ sql("select count(*), max(id), min(name), cast(avg(file.age) as integer), sum(file.age) " +
+ "from stream_table_agg_complex where id >= 2 and id <= 100000004"),
+ Seq(Row(52, 100000004, "batch_1", 27, 1408)))
+
+ checkAnswer(
+ sql("select city, count(id), sum(id), cast(avg(file.age) as integer), " +
+ "max(salary), min(salary) " +
+ "from stream_table_agg_complex " +
+ "where name in ('batch_1', 'batch_2', 'batch_3', 'name_1', 'name_2', 'name_3') " +
+ "and city <> '' " +
+ "group by city " +
+ "order by city"),
+ Seq(Row("city_1", 2, 100000002, 10, 10000.0, 0.1),
+ Row("city_2", 1, 100000002, 30, 0.2, 0.2),
+ Row("city_3", 2, 100000006, 21, 30000.0, 0.3)))
+ }
+
+ // compaction
+ test("test compaction on stream table") {
+ executeStreamingIngest(
+ tableName = "stream_table_compact",
+ batchNums = 5,
+ rowNumsEachBatch = 10,
+ intervalOfSource = 2,
+ intervalOfIngest = 4,
+ continueSeconds = 20,
+ generateBadRecords = false,
+ badRecordAction = "force"
+ )
+ for (_ <- 0 to 3) {
+ executeBatchLoad("stream_table_compact")
+ }
+
+ sql("alter table streaming.stream_table_compact compact 'minor'")
+
+ val result = sql("show segments for table streaming.stream_table_compact").collect()
+ result.foreach { row =>
+ if (row.getString(0).equals("1")) {
+ assert(row.getString(1).equals(CarbonCommonConstants.STORE_LOADSTATUS_STREAMING))
+ }
+ }
+ }
+
+ // stream segment max size
+ test("create new stream segment if current stream segment is full") {
+ executeStreamingIngest(
+ tableName = "stream_table_new",
+ batchNums = 6,
+ rowNumsEachBatch = 10000,
+ intervalOfSource = 5,
+ intervalOfIngest = 10,
+ continueSeconds = 40,
+ generateBadRecords = false,
+ badRecordAction = "force",
+ handoffSize = 1024L * 200
+ )
+ sql("show segments for table streaming.stream_table_new").show(100, false)
+
+ assert(sql("show segments for table streaming.stream_table_new").count() == 4)
+
+ checkAnswer(
+ sql("select count(*) from streaming.stream_table_new"),
+ Seq(Row(5 + 10000 * 6))
+ )
+ }
+
+ def createWriteSocketThread(
+ serverSocket: ServerSocket,
+ writeNums: Int,
+ rowNums: Int,
+ intervalSecond: Int,
+ badRecords: Boolean = false): Thread = {
+ new Thread() {
+ override def run(): Unit = {
+ // wait for client to connection request and accept
+ val clientSocket = serverSocket.accept()
+ val socketWriter = new PrintWriter(clientSocket.getOutputStream())
+ var index = 0
+ for (_ <- 1 to writeNums) {
+ // write 5 records per iteration
+ for (_ <- 1 to rowNums) {
+ index = index + 1
+ if (badRecords) {
+ if (index == 2) {
+ // null value
+ socketWriter.println(index.toString + ",name_" + index
+ + ",," + (10000.00 * index).toString +
+ ",school_" + index + ":school_" + index + index + "$" + index)
+ } else if (index == 6) {
+ // illegal number
+ socketWriter.println(index.toString + "abc,name_" + index
+ + ",city_" + index + "," + (10000.00 * index).toString +
+ ",school_" + index + ":school_" + index + index + "$" + index)
+ } else {
+ socketWriter.println(index.toString + ",name_" + index
+ + ",city_" + index + "," + (10000.00 * index).toString +
+ ",school_" + index + ":school_" + index + index + "$" + index)
+ }
+ } else {
+ socketWriter.println(index.toString + ",name_" + index
+ + ",city_" + index + "," + (10000.00 * index).toString +
+ ",school_" + index + ":school_" + index + index + "$" + index)
+ }
+ }
+ socketWriter.flush()
+ Thread.sleep(1000 * intervalSecond)
+ }
+ socketWriter.close()
+ }
+ }
+ }
+
+ def createSocketStreamingThread(
+ spark: SparkSession,
+ tablePath: CarbonTablePath,
+ badRecordAction: String = "force",
+ intervalSecond: Int = 2,
+ handoffSize: Long = CarbonStreamOutputFormat.HANDOFF_SIZE_DEFAULT): Thread = {
+ new Thread() {
+ override def run(): Unit = {
+ var qry: StreamingQuery = null
+ try {
+ val readSocketDF = spark.readStream
+ .format("socket")
+ .option("host", "localhost")
+ .option("port", 7071)
+ .load()
+
+ // Write data from socket stream to carbondata file
+ qry = readSocketDF.writeStream
+ .format("carbondata")
+ .trigger(ProcessingTime(s"$intervalSecond seconds"))
+ .option("checkpointLocation", tablePath.getStreamingCheckpointDir)
+ .option("tablePath", tablePath.getPath)
+ .option("bad_records_action", badRecordAction)
+ .option(CarbonStreamOutputFormat.HANDOFF_SIZE, handoffSize)
+ .start()
+ qry.awaitTermination()
+ } catch {
+ case ex =>
+ throw new Exception(ex.getMessage)
+ } finally {
+ if (null != qry) {
+ qry.stop()
+ }
+ }
+ }
+ }
+ }
+
+ def executeStreamingIngest(
+ tableName: String,
+ batchNums: Int,
+ rowNumsEachBatch: Int,
+ intervalOfSource: Int,
+ intervalOfIngest: Int,
+ continueSeconds: Int,
+ generateBadRecords: Boolean,
+ badRecordAction: String,
+ handoffSize: Long = CarbonStreamOutputFormat.HANDOFF_SIZE_DEFAULT
+ ): Unit = {
+ val identifier = new TableIdentifier(tableName, Option("streaming"))
+ val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark)
+ .asInstanceOf[CarbonRelation].metaData.carbonTable
+ val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
+ var server: ServerSocket = null
+ try {
+ server = new ServerSocket(7071)
+
+ val thread1 = createWriteSocketThread(
+ serverSocket = server,
+ writeNums = batchNums,
+ rowNums = rowNumsEachBatch,
+ intervalSecond = intervalOfSource,
+ badRecords = generateBadRecords)
+ val thread2 = createSocketStreamingThread(
+ spark = spark,
+ tablePath = tablePath,
+ badRecordAction = badRecordAction,
+ intervalSecond = intervalOfIngest,
+ handoffSize = handoffSize)
+ thread1.start()
+ thread2.start()
+ Thread.sleep(continueSeconds * 1000)
+ thread1.interrupt()
+ thread2.interrupt()
+ } finally {
+ if (null != server) {
+ server.close()
+ }
+ }
+ }
+
+ def generateCSVDataFile(
+ spark: SparkSession,
+ idStart: Int,
+ rowNums: Int,
+ csvDirPath: String): Unit = {
+ // Create csv data frame file
+ val csvRDD = spark.sparkContext.parallelize(idStart until idStart + rowNums)
+ .map { id =>
+ (id,
+ "name_" + id,
+ "city_" + id,
+ 10000.00 * id,
+ "school_" + id + ":school_" + id + id + "$" + id)
+ }
+ val csvDataDF = spark.createDataFrame(csvRDD).toDF("id", "name", "city", "salary", "file")
+
+ csvDataDF.write
+ .option("header", "false")
+ .mode(SaveMode.Overwrite)
+ .csv(csvDirPath)
+ }
+
+ def createFileStreamingThread(
+ spark: SparkSession,
+ tablePath: CarbonTablePath,
+ csvDataDir: String,
+ intervalSecond: Int): Thread = {
+ new Thread() {
+ override def run(): Unit = {
+ val inputSchema = new StructType()
+ .add("id", "integer")
+ .add("name", "string")
+ .add("city", "string")
+ .add("salary", "float")
+ .add("file", "string")
+ var qry: StreamingQuery = null
+ try {
+ val readSocketDF = spark.readStream
+ .format("csv")
+ .option("sep", ",")
+ .schema(inputSchema)
+ .option("path", csvDataDir)
+ .option("header", "false")
+ .load()
+
+ // Write data from socket stream to carbondata file
+ qry = readSocketDF.writeStream
+ .format("carbondata")
+ .trigger(ProcessingTime(s"${ intervalSecond } seconds"))
+ .option("checkpointLocation", tablePath.getStreamingCheckpointDir)
+ .option("tablePath", tablePath.getPath)
+ .start()
+
+ qry.awaitTermination()
+ } catch {
+ case _: InterruptedException =>
+ println("Done reading and writing streaming data")
+ } finally {
+ if (qry != null) {
+ qry.stop()
+ }
+ }
+ }
+ }
+ }
+
+ def createTable(tableName: String, streaming: Boolean, withBatchLoad: Boolean): Unit = {
+ sql(
+ s"""
+ | CREATE TABLE streaming.$tableName(
+ | id INT,
+ | name STRING,
+ | city STRING,
+ | salary FLOAT
+ | )
+ | STORED BY 'carbondata'
+ | TBLPROPERTIES(${if (streaming) "'streaming'='true', " else "" }
+ | 'sort_columns'='name', 'dictionary_include'='city')
+ | """.stripMargin)
+
+ if (withBatchLoad) {
+ // batch loading 5 rows
+ executeBatchLoad(tableName)
+ }
+ }
+
+ def createTableWithComplexType(
+ tableName: String,
+ streaming: Boolean,
+ withBatchLoad: Boolean): Unit = {
+ sql(
+ s"""
+ | CREATE TABLE streaming.$tableName(
+ | id INT,
+ | name STRING,
+ | city STRING,
+ | salary FLOAT,
+ | file struct<school:array<string>, age:int>
+ | )
+ | STORED BY 'carbondata'
+ | TBLPROPERTIES(${if (streaming) "'streaming'='true', " else "" }
+ | 'sort_columns'='name', 'dictionary_include'='city')
+ | """.stripMargin)
+
+ if (withBatchLoad) {
+ // batch loading 5 rows
+ executeBatchLoad(tableName)
+ }
+ }
+
+ def executeBatchLoad(tableName: String): Unit = {
+ sql(
+ s"""
+ | LOAD DATA LOCAL INPATH '$dataFilePath'
+ | INTO TABLE streaming.$tableName
+ | OPTIONS('HEADER'='true')
+ """.stripMargin)
+ }
+
+ def wrap(array: Array[String]) = {
+ new mutable.WrappedArray.ofRef(array)
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/40c31e80/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
index 53add22..45fcbb0 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
@@ -478,6 +478,12 @@ public final class CarbonDataMergerUtil {
Date segDate1 = null;
SimpleDateFormat sdf = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP);
for (LoadMetadataDetails segment : listOfSegmentsBelowThresholdSize) {
+ // compaction should skip streaming segments
+ if (CarbonCommonConstants.STORE_LOADSTATUS_STREAMING.equals(segment.getLoadStatus()) ||
+ CarbonCommonConstants.STORE_LOADSTATUS_STREAMING_FINISH.equals(
+ segment.getLoadStatus())) {
+ continue;
+ }
if (first) {
segDate1 = initializeFirstSegment(loadsOfSameDate, segment, sdf);
@@ -506,7 +512,15 @@ public final class CarbonDataMergerUtil {
}
}
} else {
- return listOfSegmentsBelowThresholdSize;
+ for (LoadMetadataDetails segment : listOfSegmentsBelowThresholdSize) {
+ // compaction should skip streaming segments
+ if (CarbonCommonConstants.STORE_LOADSTATUS_STREAMING.equals(segment.getLoadStatus()) ||
+ CarbonCommonConstants.STORE_LOADSTATUS_STREAMING_FINISH.equals(
+ segment.getLoadStatus())) {
+ continue;
+ }
+ loadsOfSameDate.add(segment);
+ }
}
return loadsOfSameDate;
@@ -583,6 +597,11 @@ public final class CarbonDataMergerUtil {
// check size of each segment , sum it up across partitions
for (LoadMetadataDetails segment : listOfSegmentsAfterPreserve) {
+ // compaction should skip streaming segments
+ if (CarbonCommonConstants.STORE_LOADSTATUS_STREAMING.equals(segment.getLoadStatus()) ||
+ CarbonCommonConstants.STORE_LOADSTATUS_STREAMING_FINISH.equals(segment.getLoadStatus())) {
+ continue;
+ }
String segId = segment.getLoadName();
// variable to store one segment size across partition.
@@ -688,7 +707,11 @@ public final class CarbonDataMergerUtil {
// check size of each segment , sum it up across partitions
for (LoadMetadataDetails segment : listOfSegmentsAfterPreserve) {
-
+ // compaction should skip streaming segments
+ if (CarbonCommonConstants.STORE_LOADSTATUS_STREAMING.equals(segment.getLoadStatus()) ||
+ CarbonCommonConstants.STORE_LOADSTATUS_STREAMING_FINISH.equals(segment.getLoadStatus())) {
+ continue;
+ }
String segName = segment.getLoadName();
// if a segment is already merged 2 levels then it s name will become .2
http://git-wip-us.apache.org/repos/asf/carbondata/blob/40c31e80/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
index 32ba332..d65f24d 100644
--- a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
@@ -53,8 +53,6 @@ public class StreamSegment {
private static final LogService LOGGER =
LogServiceFactory.getLogService(StreamSegment.class.getName());
- public static final long STREAM_SEGMENT_MAX_SIZE = 1024L * 1024 * 1024;
-
/**
* get stream segment or create new stream segment if not exists
*/
@@ -136,7 +134,8 @@ public class StreamSegment {
"Acquired lock for table" + table.getDatabaseName() + "." + table.getFactTableName()
+ " for stream table finish segment");
- LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(tablePath.getPath());
+ LoadMetadataDetails[] details =
+ SegmentStatusManager.readLoadMetadata(tablePath.getMetadataDirectoryPath());
for (LoadMetadataDetails detail : details) {
if (segmentId.equals(detail.getLoadName())) {
detail.setLoadEndTime(System.currentTimeMillis());
@@ -187,7 +186,7 @@ public class StreamSegment {
TaskAttemptContext job) throws Exception {
CarbonStreamRecordWriter writer = null;
try {
- writer = (CarbonStreamRecordWriter)new CarbonStreamOutputFormat().getRecordWriter(job);
+ writer = (CarbonStreamRecordWriter) new CarbonStreamOutputFormat().getRecordWriter(job);
// at the begin of each task, should recover file if necessary
// here can reuse some information of record writer
recoverFileIfRequired(
@@ -199,6 +198,12 @@ public class StreamSegment {
writer.write(null, inputIterators.next());
}
inputIterators.close();
+ } catch (Exception ex) {
+ if (writer != null) {
+ LOGGER.error(ex, "Failed to append batch data to stream segment: " +
+ writer.getSegmentDir());
+ }
+ throw ex;
} finally {
if (writer != null) {
writer.close(job);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/40c31e80/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
index 3ac19d9..31ed1f6 100644
--- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
@@ -23,7 +23,6 @@ import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.streaming.{CarbonAppendableStreamSink, Sink}
-import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.dictionary.server.DictionaryServer
@@ -31,6 +30,7 @@ import org.apache.carbondata.core.metadata.encoder.Encoding
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.hadoop.streaming.CarbonStreamOutputFormat
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.spark.util.DataLoadingUtil
import org.apache.carbondata.streaming.segment.StreamSegment
@@ -76,7 +76,21 @@ object StreamSinkFactory {
}
private def validateParameters(parameters: Map[String, String]): Unit = {
- // TODO require to validate parameters
+ val segmentSize = parameters.get(CarbonStreamOutputFormat.HANDOFF_SIZE)
+ if (segmentSize.isDefined) {
+ try {
+ val value = java.lang.Long.parseLong(segmentSize.get)
+ if (value < CarbonStreamOutputFormat.HANDOFF_SIZE_MIN) {
+ new CarbonStreamException(CarbonStreamOutputFormat.HANDOFF_SIZE +
+ "should be bigger than or equal " +
+ CarbonStreamOutputFormat.HANDOFF_SIZE_MIN)
+ }
+ } catch {
+ case ex: NumberFormatException =>
+ new CarbonStreamException(CarbonStreamOutputFormat.HANDOFF_SIZE +
+ s" $segmentSize is an illegal number")
+ }
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/40c31e80/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
index 844423a..b3f0964 100644
--- a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
+++ b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
@@ -68,6 +68,10 @@ class CarbonAppendableStreamSink(
}
conf
}
+ // segment max size(byte)
+ private val segmentMaxSize = hadoopConf.getLong(
+ CarbonStreamOutputFormat.HANDOFF_SIZE,
+ CarbonStreamOutputFormat.HANDOFF_SIZE_DEFAULT)
override def addBatch(batchId: Long, data: DataFrame): Unit = {
if (batchId <= fileLog.getLatest().map(_._1).getOrElse(-1L)) {
@@ -107,8 +111,7 @@ class CarbonAppendableStreamSink(
private def checkOrHandOffSegment(): Unit = {
val segmentDir = carbonTablePath.getSegmentDir("0", currentSegmentId)
val fileType = FileFactory.getFileType(segmentDir)
- if (StreamSegment.STREAM_SEGMENT_MAX_SIZE <=
- StreamSegment.size(segmentDir)) {
+ if (segmentMaxSize <= StreamSegment.size(segmentDir)) {
val newSegmentId =
StreamSegment.close(carbonTable, currentSegmentId)
currentSegmentId = newSegmentId
@@ -238,6 +241,7 @@ object CarbonAppendableStreamSink {
val taskAttemptContext: TaskAttemptContext = {
// Set up the configuration object
val hadoopConf = description.serializableHadoopConf.value
+ CarbonStreamOutputFormat.setSegmentId(hadoopConf, description.segmentId)
hadoopConf.set("mapred.job.id", jobId.toString)
hadoopConf.set("mapred.tip.id", taskAttemptId.getTaskID.toString)
hadoopConf.set("mapred.task.id", taskAttemptId.toString)