You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2018/09/13 08:44:19 UTC
carbondata git commit: [CARBONDATA-2919] Support ingest from Kafka in
StreamSQL
Repository: carbondata
Updated Branches:
refs/heads/master 4c692d185 -> 560bfbe77
[CARBONDATA-2919] Support ingest from Kafka in StreamSQL
This closes #2695
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/560bfbe7
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/560bfbe7
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/560bfbe7
Branch: refs/heads/master
Commit: 560bfbe778a29bad285a6388686d888a3e877e33
Parents: 4c692d1
Author: Jacky Li <ja...@qq.com>
Authored: Tue Sep 11 12:17:55 2018 +0800
Committer: QiangCai <qi...@qq.com>
Committed: Thu Sep 13 16:42:20 2018 +0800
----------------------------------------------------------------------
.../core/locks/AbstractCarbonLock.java | 10 +
.../carbondata/core/locks/HdfsFileLock.java | 4 -
.../carbondata/core/locks/ICarbonLock.java | 5 +
.../carbondata/core/locks/LocalFileLock.java | 5 -
.../carbondata/core/locks/S3FileLock.java | 4 -
.../core/metadata/schema/table/CarbonTable.java | 8 +
.../carbondata/core/util/SessionParams.java | 2 +
datamap/mv/plan/pom.xml | 6 +
examples/spark2/pom.xml | 4 +
.../carbondata/examples/StreamSQLExample.scala | 107 ++++++++++
.../examples/StructuredStreamingExample.scala | 1 +
.../org/apache/carbondata/spark/util/Util.java | 11 +
.../org/apache/carbondata/api/CarbonStore.scala | 43 ++--
.../carbondata/spark/rdd/StreamHandoffRDD.scala | 2 +
.../streaming/CarbonAppendableStreamSink.scala | 1 +
.../datasources/CarbonSparkDataSourceUtil.scala | 16 +-
integration/spark2/pom.xml | 6 +
.../carbondata/spark/util/CarbonSparkUtil.scala | 10 +
.../carbondata/stream/StreamJobManager.scala | 43 ++--
.../CarbonAlterTableCompactionCommand.scala | 5 +-
.../management/CarbonShowLoadsCommand.scala | 10 +-
.../stream/CarbonCreateStreamCommand.scala | 213 ++++++++++++++++---
.../TestStreamingTableOperation.scala | 21 +-
pom.xml | 6 +
.../loading/parser/impl/RowParserImpl.java | 3 +
.../streaming/segment/StreamSegment.java | 2 +-
26 files changed, 451 insertions(+), 97 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/core/src/main/java/org/apache/carbondata/core/locks/AbstractCarbonLock.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/locks/AbstractCarbonLock.java b/core/src/main/java/org/apache/carbondata/core/locks/AbstractCarbonLock.java
index 4aa0a18..37f77b2 100644
--- a/core/src/main/java/org/apache/carbondata/core/locks/AbstractCarbonLock.java
+++ b/core/src/main/java/org/apache/carbondata/core/locks/AbstractCarbonLock.java
@@ -30,8 +30,18 @@ public abstract class AbstractCarbonLock implements ICarbonLock {
private int retryTimeout;
+ /**
+ * lockFilePath is the location of the lock file.
+ */
+ protected String lockFilePath;
+
public abstract boolean lock();
+ @Override
+ public String getLockFilePath() {
+ return this.lockFilePath;
+ }
+
/**
* API for enabling the locking of file with retries.
*/
http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java b/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java
index ade4212..bc65ece 100644
--- a/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java
+++ b/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java
@@ -33,10 +33,6 @@ public class HdfsFileLock extends AbstractCarbonLock {
private static final LogService LOGGER =
LogServiceFactory.getLogService(HdfsFileLock.class.getName());
- /**
- * lockFilePath is the location of the lock file.
- */
- private String lockFilePath;
/**
* lockFileDir is the directory of the lock file.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/core/src/main/java/org/apache/carbondata/core/locks/ICarbonLock.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/locks/ICarbonLock.java b/core/src/main/java/org/apache/carbondata/core/locks/ICarbonLock.java
index e964f0c..ab20a5e 100644
--- a/core/src/main/java/org/apache/carbondata/core/locks/ICarbonLock.java
+++ b/core/src/main/java/org/apache/carbondata/core/locks/ICarbonLock.java
@@ -51,4 +51,9 @@ public interface ICarbonLock {
*/
boolean releaseLockManually(String lockFile);
+ /**
+ * Return the path to the lock file
+ * @return lock file path
+ */
+ String getLockFilePath();
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java b/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java
index 1148ae2..5e3033e 100644
--- a/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java
+++ b/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java
@@ -36,11 +36,6 @@ import org.apache.carbondata.core.util.path.CarbonTablePath;
*/
public class LocalFileLock extends AbstractCarbonLock {
/**
- * lockFilePath is the location of the lock file.
- */
- private String lockFilePath;
-
- /**
* lockFileDir is the directory of the lock file.
*/
private String lockFileDir;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/core/src/main/java/org/apache/carbondata/core/locks/S3FileLock.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/locks/S3FileLock.java b/core/src/main/java/org/apache/carbondata/core/locks/S3FileLock.java
index 464becb..10bab28 100644
--- a/core/src/main/java/org/apache/carbondata/core/locks/S3FileLock.java
+++ b/core/src/main/java/org/apache/carbondata/core/locks/S3FileLock.java
@@ -34,10 +34,6 @@ public class S3FileLock extends AbstractCarbonLock {
private static final LogService LOGGER =
LogServiceFactory.getLogService(S3FileLock.class.getName());
- /**
- * lockFilePath is the location of the lock file.
- */
- private String lockFilePath;
/**
* lockFileDir is the directory of the lock file.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index 21f24d6..c606063 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -1186,6 +1186,14 @@ public class CarbonTable implements Serializable {
}
/**
+ * Return the format value defined in table properties
+ * @return String as per table properties, null if not defined
+ */
+ public String getFormat() {
+ return getTableInfo().getFactTable().getTableProperties().get("format");
+ }
+
+ /**
* Method to get the list of cached columns of the table.
* This method need to be used for Describe formatted like scenario where columns need to be
* displayed in the column create order
http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
index 2abf0e1..931e106 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
@@ -36,6 +36,7 @@ import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_
import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_MAJOR_COMPACTION_SIZE;
import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_SEARCH_MODE_ENABLE;
import static org.apache.carbondata.core.constants.CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE;
import static org.apache.carbondata.core.constants.CarbonCommonConstants.ENABLE_OFFHEAP_SORT;
import static org.apache.carbondata.core.constants.CarbonCommonConstants.ENABLE_UNSAFE_IN_QUERY_EXECUTION;
import static org.apache.carbondata.core.constants.CarbonCommonConstants.ENABLE_UNSAFE_SORT;
@@ -162,6 +163,7 @@ public class SessionParams implements Serializable, Cloneable {
case CARBON_SEARCH_MODE_ENABLE:
case ENABLE_VECTOR_READER:
case ENABLE_UNSAFE_IN_QUERY_EXECUTION:
+ case ENABLE_AUTO_LOAD_MERGE:
isValid = CarbonUtil.validateBoolean(value);
if (!isValid) {
throw new InvalidConfigurationException("Invalid value " + value + " for key " + key);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/datamap/mv/plan/pom.xml
----------------------------------------------------------------------
diff --git a/datamap/mv/plan/pom.xml b/datamap/mv/plan/pom.xml
index ff6976d..982724d 100644
--- a/datamap/mv/plan/pom.xml
+++ b/datamap/mv/plan/pom.xml
@@ -48,6 +48,12 @@
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>net.jpountz.lz4</groupId>
+ <artifactId>lz4</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
</dependencies>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/examples/spark2/pom.xml
----------------------------------------------------------------------
diff --git a/examples/spark2/pom.xml b/examples/spark2/pom.xml
index 91cb20f..bd497c5 100644
--- a/examples/spark2/pom.xml
+++ b/examples/spark2/pom.xml
@@ -55,6 +55,10 @@
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
<artifactId>spark-hive-thriftserver_${scala.binary.version}</artifactId>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamSQLExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamSQLExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamSQLExample.scala
new file mode 100644
index 0000000..58f51bd
--- /dev/null
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamSQLExample.scala
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.examples
+
+import java.io.File
+import java.net.ServerSocket
+
+import org.apache.carbondata.examples.util.ExampleUtils
+
+// scalastyle:off println
+object StreamSQLExample {
+ def main(args: Array[String]) {
+
+ // setup paths
+ val rootPath = new File(this.getClass.getResource("/").getPath
+ + "../../../..").getCanonicalPath
+
+ val spark = ExampleUtils.createCarbonSession("StructuredStreamingExample", 4)
+
+ val requireCreateTable = true
+
+ if (requireCreateTable) {
+ // drop table if exists previously
+ spark.sql(s"DROP TABLE IF EXISTS sink")
+ spark.sql("DROP TABLE IF EXISTS source")
+
+ // Create target carbon table and populate with initial data
+ spark.sql(
+ s"""
+ | CREATE TABLE sink(
+ | id INT,
+ | name STRING,
+ | city STRING,
+ | salary FLOAT,
+ | file struct<school:array<string>, age:int>
+ | )
+ | STORED AS carbondata
+ | TBLPROPERTIES(
+ | 'streaming'='true', 'sort_columns'='')
+ """.stripMargin)
+ }
+
+ spark.sql(
+ """
+ | CREATE TABLE source (
+ | id INT,
+ | name STRING,
+ | city STRING,
+ | salary FLOAT,
+ | file struct<school:array<string>, age:int>
+ | )
+ | STORED AS carbondata
+ | TBLPROPERTIES(
+ | 'streaming'='source',
+ | 'format'='socket',
+ | 'host'='localhost',
+ | 'port'='7071')
+ """.stripMargin)
+
+ val serverSocket = new ServerSocket(7071)
+
+ // start ingest streaming job
+ spark.sql(
+ s"""
+ | CREATE STREAM ingest ON TABLE sink
+ | STMPROPERTIES(
+ | 'trigger' = 'ProcessingTime',
+ | 'interval' = '3 seconds')
+ | AS SELECT * FROM source
+ """.stripMargin)
+
+ // start writing data into the socket
+ import StructuredStreamingExample.{showTableCount, writeSocket}
+ val thread1 = writeSocket(serverSocket)
+ val thread2 = showTableCount(spark, "sink")
+
+ System.out.println("type enter to interrupt streaming")
+ System.in.read()
+ thread1.interrupt()
+ thread2.interrupt()
+ serverSocket.close()
+
+ // stop streaming job
+ spark.sql("DROP STREAM ingest").show
+
+ spark.stop()
+ System.out.println("streaming finished")
+ }
+
+}
+
+// scalastyle:on println
http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/examples/spark2/src/main/scala/org/apache/carbondata/examples/StructuredStreamingExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StructuredStreamingExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StructuredStreamingExample.scala
index f88d8ee..31de668 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StructuredStreamingExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StructuredStreamingExample.scala
@@ -130,6 +130,7 @@ object StructuredStreamingExample {
override def run(): Unit = {
for (_ <- 0 to 1000) {
spark.sql(s"select count(*) from $tableName").show(truncate = false)
+ spark.sql(s"show segments for table $tableName").show
Thread.sleep(1000 * 3)
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/Util.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/Util.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/Util.java
index b4c8250..832a1b2 100644
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/Util.java
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/Util.java
@@ -22,6 +22,7 @@ import java.util.List;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.hadoop.CarbonInputSplit;
@@ -83,6 +84,16 @@ public class Util {
}
}
+ public static StructType convertToSparkSchema(CarbonTable table) {
+ List<CarbonColumn> columns = table.getCreateOrderColumn(table.getTableName());
+ ColumnSchema[] schema = new ColumnSchema[columns.size()];
+ int i = 0;
+ for (CarbonColumn column : columns) {
+ schema[i++] = column.getColumnSchema();
+ }
+ return convertToSparkSchema(table, schema);
+ }
+
public static StructType convertToSparkSchema(CarbonTable table, ColumnSchema[] carbonColumns) {
List<StructField> fields = new ArrayList<>(carbonColumns.length);
for (int i = 0; i < carbonColumns.length; i++) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
index 02c8607..3864b5d 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
@@ -37,21 +37,23 @@ import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, SegmentFileStore}
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
-import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentStatus, SegmentStatusManager}
import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.streaming.segment.StreamSegment
object CarbonStore {
private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
def showSegments(
limit: Option[String],
- tableFolderPath: String,
+ tablePath: String,
showHistory: Boolean): Seq[Row] = {
+ val metaFolder = CarbonTablePath.getMetadataPath(tablePath)
val loadMetadataDetailsArray = if (showHistory) {
- SegmentStatusManager.readLoadMetadata(tableFolderPath) ++
- SegmentStatusManager.readLoadHistoryMetadata(tableFolderPath)
+ SegmentStatusManager.readLoadMetadata(metaFolder) ++
+ SegmentStatusManager.readLoadHistoryMetadata(metaFolder)
} else {
- SegmentStatusManager.readLoadMetadata(tableFolderPath)
+ SegmentStatusManager.readLoadMetadata(metaFolder)
}
if (loadMetadataDetailsArray.nonEmpty) {
@@ -84,18 +86,31 @@ object CarbonStore {
val startTime =
if (load.getLoadStartTime == CarbonCommonConstants.SEGMENT_LOAD_TIME_DEFAULT) {
- null
+ "NA"
} else {
- new java.sql.Timestamp(load.getLoadStartTime)
+ new java.sql.Timestamp(load.getLoadStartTime).toString
}
val endTime =
if (load.getLoadEndTime == CarbonCommonConstants.SEGMENT_LOAD_TIME_DEFAULT) {
- null
+ "NA"
} else {
- new java.sql.Timestamp(load.getLoadEndTime)
+ new java.sql.Timestamp(load.getLoadEndTime).toString
}
+ val (dataSize, indexSize) = if (load.getFileFormat == FileFormat.ROW_V1) {
+ // for streaming segment, we should get the actual size from the index file
+ // since it is continuously inserting data
+ val segmentDir = CarbonTablePath.getSegmentPath(tablePath, load.getLoadName)
+ val indexPath = CarbonTablePath.getCarbonStreamIndexFilePath(segmentDir)
+ val indices = StreamSegment.readIndexFile(indexPath, FileFactory.getFileType(indexPath))
+ (indices.asScala.map(_.getFile_size).sum, FileFactory.getCarbonFile(indexPath).getSize)
+ } else {
+ // for batch segment, we can get the data size from table status file directly
+ (if (load.getDataSize == null) 0L else load.getDataSize.toLong,
+ if (load.getIndexSize == null) 0L else load.getIndexSize.toLong)
+ }
+
if (showHistory) {
Row(
load.getLoadName,
@@ -104,9 +119,9 @@ object CarbonStore {
endTime,
mergedTo,
load.getFileFormat.toString,
- load.getVisibility(),
- Strings.formatSize(if (load.getDataSize == null) 0 else load.getDataSize.toFloat),
- Strings.formatSize(if (load.getIndexSize == null) 0 else load.getIndexSize.toFloat))
+ load.getVisibility,
+ Strings.formatSize(dataSize.toFloat),
+ Strings.formatSize(indexSize.toFloat))
} else {
Row(
load.getLoadName,
@@ -115,8 +130,8 @@ object CarbonStore {
endTime,
mergedTo,
load.getFileFormat.toString,
- Strings.formatSize(if (load.getDataSize == null) 0 else load.getDataSize.toFloat),
- Strings.formatSize(if (load.getIndexSize == null) 0 else load.getIndexSize.toFloat))
+ Strings.formatSize(dataSize.toFloat),
+ Strings.formatSize(indexSize.toFloat))
}
}.toSeq
} else {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
index ab6e320..57b2e44 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
@@ -410,6 +410,8 @@ object StreamHandoffRDD {
} else {
newSegment.get.setSegmentStatus(SegmentStatus.SUCCESS)
newSegment.get.setLoadEndTime(System.currentTimeMillis())
+ CarbonLoaderUtil.addDataIndexSizeIntoMetaEntry(newSegment.get, loadModel.getSegmentId,
+ loadModel.getCarbonDataLoadSchema.getCarbonTable)
}
// update streaming segment to compacted status
http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
index 5762906..6d93b34 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
@@ -43,6 +43,7 @@ import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants
+import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostExecutionEvent, LoadTablePreExecutionEvent}
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.spark.rdd.StreamHandoffRDD
http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala
index 00a5139..73c07b4 100644
--- a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala
+++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.metadata.datatype.{DataType => CarbonDataType, DataTypes => CarbonDataTypes, DecimalType => CarbonDecimalType, StructField => CarbonStructField}
+import org.apache.carbondata.core.metadata.datatype.{ArrayType => CarbonArrayType, DataType => CarbonDataType, DataTypes => CarbonDataTypes, DecimalType => CarbonDecimalType, MapType => CarbonMapType, StructField => CarbonStructField, StructType => CarbonStructType}
import org.apache.carbondata.core.scan.expression.{ColumnExpression => CarbonColumnExpression, Expression => CarbonExpression, LiteralExpression => CarbonLiteralExpression}
import org.apache.carbondata.core.scan.expression.conditional._
import org.apache.carbondata.core.scan.expression.logical.{AndExpression, FalseExpression, OrExpression}
@@ -39,6 +39,20 @@ object CarbonSparkDataSourceUtil {
DecimalType(dataType.asInstanceOf[CarbonDecimalType].getPrecision,
dataType.asInstanceOf[CarbonDecimalType].getScale)
} else {
+ if (CarbonDataTypes.isStructType(dataType)) {
+ val struct = dataType.asInstanceOf[CarbonStructType]
+ return StructType(struct.getFields.asScala.map(x =>
+ StructField(x.getFieldName, convertCarbonToSparkDataType(x.getDataType)))
+ )
+ } else if (CarbonDataTypes.isArrayType(dataType)) {
+ val array = dataType.asInstanceOf[CarbonArrayType]
+ return ArrayType(convertCarbonToSparkDataType(array.getElementType))
+ } else if (CarbonDataTypes.isMapType(dataType)) {
+ val map = dataType.asInstanceOf[CarbonMapType]
+ return MapType(
+ convertCarbonToSparkDataType(map.getKeyType),
+ convertCarbonToSparkDataType(map.getValueType))
+ }
dataType match {
case CarbonDataTypes.STRING => StringType
case CarbonDataTypes.SHORT => ShortType
http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/integration/spark2/pom.xml
----------------------------------------------------------------------
diff --git a/integration/spark2/pom.xml b/integration/spark2/pom.xml
index 5af4fbe..8c8fd28 100644
--- a/integration/spark2/pom.xml
+++ b/integration/spark2/pom.xml
@@ -109,6 +109,12 @@
</exclusions>
</dependency>
<dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
+ <version>${spark.version}</version>
+ <scope>${spark.deps.scope}</scope>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
index a0c0545..b2687d0 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
@@ -107,4 +107,14 @@ object CarbonSparkUtil {
})
fields.mkString(",")
}
+
+ /**
+ * add escape prefix for delimiter
+ */
+ def delimiterConverter4Udf(delimiter: String): String = delimiter match {
+ case "|" | "*" | "." | ":" | "^" | "\\" | "$" | "+" | "?" | "(" | ")" | "{" | "}" | "[" | "]" =>
+ "\\\\" + delimiter
+ case _ =>
+ delimiter
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala b/integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala
index f0d7bf5..23323d4 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala
@@ -21,7 +21,7 @@ import java.util.concurrent.{ConcurrentHashMap, CountDownLatch, TimeUnit}
import scala.collection.JavaConverters._
-import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.{CarbonEnv, DataFrame, SparkSession}
import org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.types.{StructField, StructType}
@@ -29,7 +29,9 @@ import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.carbondata.common.exceptions.NoSuchStreamException
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
import org.apache.carbondata.spark.StreamingOption
import org.apache.carbondata.streaming.CarbonStreamException
@@ -46,25 +48,29 @@ object StreamJobManager {
private def validateSourceTable(source: CarbonTable): Unit = {
if (!source.isStreamingSource) {
- throw new MalformedCarbonCommandException(s"Table ${source.getTableName} is not " +
- "streaming source table " +
- "('streaming' tblproperty is not 'source')")
+ throw new MalformedCarbonCommandException(
+ s"Table ${source.getTableName} is not streaming source table " +
+ "('streaming' tblproperty is not 'source')")
}
}
- private def validateSinkTable(querySchema: StructType, sink: CarbonTable): Unit = {
+ private def validateSinkTable(validateQuerySchema: Boolean,
+ querySchema: StructType, sink: CarbonTable): Unit = {
if (!sink.isStreamingSink) {
- throw new MalformedCarbonCommandException(s"Table ${sink.getTableName} is not " +
- "streaming sink table " +
- "('streaming' tblproperty is not 'sink' or 'true')")
+ throw new MalformedCarbonCommandException(
+ s"Table ${sink.getTableName} is not streaming sink table " +
+ "('streaming' tblproperty is not 'sink' or 'true')")
}
- val fields = sink.getCreateOrderColumn(sink.getTableName).asScala.map { column =>
- StructField(column.getColName,
- CarbonSparkDataSourceUtil.convertCarbonToSparkDataType(column.getDataType))
- }
- if (!querySchema.equals(StructType(fields))) {
- throw new MalformedCarbonCommandException(s"Schema of table ${sink.getTableName} " +
- s"does not match query output")
+ if (validateQuerySchema) {
+ val fields = sink.getCreateOrderColumn(sink.getTableName).asScala.map { column =>
+ StructField(
+ column.getColName,
+ CarbonSparkDataSourceUtil.convertCarbonToSparkDataType(column.getDataType))
+ }
+ if (!querySchema.equals(StructType(fields))) {
+ throw new MalformedCarbonCommandException(
+ s"Schema of table ${sink.getTableName} does not match query output")
+ }
}
}
@@ -102,7 +108,12 @@ object StreamJobManager {
}
validateSourceTable(sourceTable)
- validateSinkTable(streamDf.schema, sinkTable)
+
+ // for kafka and socket stream source, the source table schema is one string column only
+ // so we do not validate the query schema against the sink table schema
+ val isKafka = Option(sourceTable.getFormat).contains("kafka")
+ val isSocket = Option(sourceTable.getFormat).contains("socket")
+ validateSinkTable(!(isKafka || isSocket), streamDf.schema, sinkTable)
// start a new thread to run the streaming ingest job, the job will be running
// until user stops it by STOP STREAM JOB
http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index a13dfdc..8b6dabd 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -330,10 +330,13 @@ case class CarbonAlterTableCompactionCommand(
): Unit = {
val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
- // 1. acquire lock of streaming.lock
+ // 1. delete the lock of streaming.lock, forcing the stream to be closed
val streamingLock = CarbonLockFactory.getCarbonLockObj(
carbonTable.getTableInfo.getOrCreateAbsoluteTableIdentifier,
LockUsage.STREAMING_LOCK)
+ if (!FileFactory.getCarbonFile(streamingLock.getLockFilePath).delete()) {
+ LOGGER.warn("failed to delete lock file: " + streamingLock.getLockFilePath)
+ }
try {
if (streamingLock.lockWithRetries()) {
// 2. convert segment status from "streaming" to "streaming finish"
http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
index 3d57a99..3f68cc4 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
@@ -37,8 +37,8 @@ case class CarbonShowLoadsCommand(
if (showHistory) {
Seq(AttributeReference("SegmentSequenceId", StringType, nullable = false)(),
AttributeReference("Status", StringType, nullable = false)(),
- AttributeReference("Load Start Time", TimestampType, nullable = false)(),
- AttributeReference("Load End Time", TimestampType, nullable = true)(),
+ AttributeReference("Load Start Time", StringType, nullable = false)(),
+ AttributeReference("Load End Time", StringType, nullable = true)(),
AttributeReference("Merged To", StringType, nullable = false)(),
AttributeReference("File Format", StringType, nullable = false)(),
AttributeReference("Visibility", StringType, nullable = false)(),
@@ -47,8 +47,8 @@ case class CarbonShowLoadsCommand(
} else {
Seq(AttributeReference("SegmentSequenceId", StringType, nullable = false)(),
AttributeReference("Status", StringType, nullable = false)(),
- AttributeReference("Load Start Time", TimestampType, nullable = false)(),
- AttributeReference("Load End Time", TimestampType, nullable = true)(),
+ AttributeReference("Load Start Time", StringType, nullable = false)(),
+ AttributeReference("Load End Time", StringType, nullable = true)(),
AttributeReference("Merged To", StringType, nullable = false)(),
AttributeReference("File Format", StringType, nullable = false)(),
AttributeReference("Data Size", StringType, nullable = false)(),
@@ -64,7 +64,7 @@ case class CarbonShowLoadsCommand(
}
CarbonStore.showSegments(
limit,
- carbonTable.getMetadataPath,
+ carbonTable.getTablePath,
showHistory
)
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala
index 1e1ab44..94e063b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala
@@ -17,11 +17,14 @@
package org.apache.spark.sql.execution.command.stream
+import java.util
+
import scala.collection.JavaConverters._
+import scala.collection.mutable
import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, ExprId, NamedExpression}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
import org.apache.spark.sql.execution.command.DataCommand
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.streaming.StreamingRelation
@@ -29,8 +32,9 @@ import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
import org.apache.carbondata.spark.StreamingOption
-import org.apache.carbondata.spark.util.Util
+import org.apache.carbondata.spark.util.{CarbonSparkUtil, Util}
import org.apache.carbondata.stream.StreamJobManager
/**
@@ -51,29 +55,57 @@ case class CarbonCreateStreamCommand(
AttributeReference("Status", StringType, nullable = false)())
override def processData(sparkSession: SparkSession): Seq[Row] = {
- val df = sparkSession.sql(query)
- var sourceTable: CarbonTable = null
-
- // find the streaming source table in the query
- // and replace it with StreamingRelation
- val streamLp = df.logicalPlan transform {
+ val inputQuery = sparkSession.sql(query)
+ val sourceTableSeq = inputQuery.logicalPlan collect {
case r: LogicalRelation
if r.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
r.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.isStreamingSource =>
- val (source, streamingRelation) = prepareStreamingRelation(sparkSession, r)
- if (sourceTable != null && sourceTable.getTableName != source.getTableName) {
- throw new MalformedCarbonCommandException(
- "Stream query on more than one stream source table is not supported")
- }
- sourceTable = source
- streamingRelation
- case plan: LogicalPlan => plan
+ r.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
+ }
+ if (sourceTableSeq.isEmpty) {
+ throw new MalformedCarbonCommandException(
+ "Must specify stream source table in the stream query")
+ }
+ if (sourceTableSeq.size > 1) {
+ throw new MalformedCarbonCommandException(
+ "Stream query on more than one stream source table is not supported")
+ }
+ val sourceTable = sourceTableSeq.head
+
+ val tblProperty = sourceTable.getTableInfo.getFactTable.getTableProperties
+ val format = sourceTable.getFormat
+ if (format == null) {
+ throw new MalformedCarbonCommandException("Streaming from carbon file is not supported")
+ }
+ val updatedQuery = if (format.equals("kafka")) {
+ shouldHaveProperty(tblProperty, "kafka.bootstrap.servers", sourceTable)
+ shouldHaveProperty(tblProperty, "subscribe", sourceTable)
+ createPlan(sparkSession, inputQuery, sourceTable, "kafka", tblProperty)
+ } else if (format.equals("socket")) {
+ shouldHaveProperty(tblProperty, "host", sourceTable)
+ shouldHaveProperty(tblProperty, "port", sourceTable)
+ createPlan(sparkSession, inputQuery, sourceTable, "socket", tblProperty)
+ } else {
+ // Replace the logical relation with a streaming relation created
+ // from the stream source table
+ inputQuery.logicalPlan transform {
+ case r: LogicalRelation
+ if r.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
+ r.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.isStreamingSource
+ => prepareStreamingRelation(sparkSession, r)
+ case plan: LogicalPlan => plan
+ }
}
if (sourceTable == null) {
throw new MalformedCarbonCommandException("Must specify stream source table in the query")
}
+ // add CSV row parser if user does not specify
+ val newMap = mutable.Map[String, String]()
+ optionMap.foreach(x => newMap(x._1) = x._2)
+ newMap(CSVInputFormat.DELIMITER) = tblProperty.asScala.getOrElse("delimiter", ",")
+
// start the streaming job
val jobId = StreamJobManager.startStream(
sparkSession = sparkSession,
@@ -82,33 +114,140 @@ case class CarbonCreateStreamCommand(
sourceTable = sourceTable,
sinkTable = CarbonEnv.getCarbonTable(sinkDbName, sinkTableName)(sparkSession),
query = query,
- streamDf = Dataset.ofRows(sparkSession, streamLp),
- options = new StreamingOption(optionMap)
+ streamDf = Dataset.ofRows(sparkSession, updatedQuery),
+ options = new StreamingOption(newMap.toMap)
)
Seq(Row(streamName, jobId, "RUNNING"))
}
+ /**
+ * Create a new plan for the stream query on kafka and Socket source table.
+ * This is required because we need to convert the schema of the data stored in kafka
+ * The returned logical plan contains the complete plan tree of original plan with
+ * logical relation replaced with a streaming relation.
+ *
+ * @param sparkSession spark session
+ * @param inputQuery stream query from user
+ * @param sourceTable source table (kafka table)
+ * @param sourceName source name, kafka or socket
+ * @param tblProperty table property of source table
+ * @return a new logical plan
+ */
+ private def createPlan(
+ sparkSession: SparkSession,
+ inputQuery: DataFrame,
+ sourceTable: CarbonTable,
+ sourceName: String,
+ tblProperty: util.Map[String, String]): LogicalPlan = {
+ // We follow 3 steps to generate new plan
+ // 1. replace the logical relation in stream query with streaming relation
+ // 2. collect the new ExprId generated
+ // 3. update the stream query plan with the new ExprId generated, to make the plan consistent
+
+ // exprList is used for UDF to extract the data from the 'value' column in kafka
+ val columnNames = Util.convertToSparkSchema(sourceTable).fieldNames
+ val exprList = columnNames.zipWithIndex.map {
+ case (columnName, i) =>
+ s"case when size(_values) > $i then _values[$i] else null end AS $columnName"
+ }
+
+ val delimiter = tblProperty.asScala.getOrElse("delimiter", ",")
+ val aliasMap = new util.HashMap[String, ExprId]()
+ val updatedQuery = inputQuery.logicalPlan transform {
+ case r: LogicalRelation
+ if r.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
+ r.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.isStreamingSource =>
+ // for kafka stream source, get the 'value' column and split it by using UDF
+ val kafkaPlan = sparkSession.readStream
+ .format(sourceName)
+ .options(tblProperty)
+ .load()
+ .selectExpr("CAST(value as string) as _value")
+ .selectExpr(
+ s"split(_value, '${CarbonSparkUtil.delimiterConverter4Udf(delimiter)}') as _values")
+ .selectExpr(exprList: _*)
+ .logicalPlan
+
+ // collect the newly generated ExprId
+ kafkaPlan collect {
+ case p@Project(projectList, child) =>
+ projectList.map { expr =>
+ aliasMap.put(expr.name, expr.exprId)
+ }
+ p
+ }
+ kafkaPlan
+ case plan: LogicalPlan => plan
+ }
+
+ // transform the stream plan to replace all attribute with the collected ExprId
+ val transFormedPlan = updatedQuery transform {
+ case p@Project(projectList: Seq[NamedExpression], child) =>
+ val newProjectList = projectList.map { expr =>
+ val newExpr = expr transform {
+ case attribute: Attribute =>
+ val exprId: ExprId = aliasMap.get(attribute.name)
+ if (exprId != null) {
+ if (exprId.id != attribute.exprId.id) {
+ AttributeReference(
+ attribute.name, attribute.dataType, attribute.nullable,
+ attribute.metadata)(exprId, attribute.qualifier)
+ } else {
+ attribute
+ }
+ } else {
+ attribute
+ }
+ }
+ newExpr.asInstanceOf[NamedExpression]
+ }
+ Project(newProjectList, child)
+ case f@Filter(condition: Expression, child) =>
+ val newCondition = condition transform {
+ case attribute: Attribute =>
+ val exprId: ExprId = aliasMap.get(attribute.name)
+ if (exprId != null) {
+ if (exprId.id != attribute.exprId.id) {
+ AttributeReference(
+ attribute.name, attribute.dataType, attribute.nullable,
+ attribute.metadata)(exprId, attribute.qualifier)
+ } else {
+ attribute
+ }
+ } else {
+ attribute
+ }
+ }
+ Filter(newCondition, child)
+ }
+ transFormedPlan
+ }
+
+ /**
+ * Create a streaming relation from the input logical relation (source table)
+ *
+ * @param sparkSession spark session
+ * @param logicalRelation source table to convert
+ * @return sourceTable and its streaming relation
+ */
private def prepareStreamingRelation(
sparkSession: SparkSession,
- r: LogicalRelation): (CarbonTable, StreamingRelation) = {
- val sourceTable = r.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
+ logicalRelation: LogicalRelation): StreamingRelation = {
+ val sourceTable = logicalRelation.relation
+ .asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
val tblProperty = sourceTable.getTableInfo.getFactTable.getTableProperties
- val format = tblProperty.get("format")
+ val format = sourceTable.getFormat
if (format == null) {
throw new MalformedCarbonCommandException("Streaming from carbon file is not supported")
}
- val streamReader = sparkSession.readStream
+ val streamReader = sparkSession
+ .readStream
.schema(getSparkSchema(sourceTable))
.format(format)
- val dataFrame = format match {
+ val dataFrame: DataFrame = format match {
case "csv" | "text" | "json" | "parquet" =>
- if (!tblProperty.containsKey("path")) {
- throw new MalformedCarbonCommandException(
- s"'path' tblproperty should be provided for '$format' format")
- }
+ shouldHaveProperty(tblProperty, "path", sourceTable)
streamReader.load(tblProperty.get("path"))
- case "kafka" | "socket" =>
- streamReader.load()
case other =>
throw new MalformedCarbonCommandException(s"Streaming from $format is not supported")
}
@@ -116,8 +255,18 @@ case class CarbonCreateStreamCommand(
// Since SparkSQL analyzer will match the UUID in attribute,
// create a new StreamRelation and re-use the same attribute from LogicalRelation
- (sourceTable,
- StreamingRelation(streamRelation.dataSource, streamRelation.sourceName, r.output))
+ StreamingRelation(streamRelation.dataSource, streamRelation.sourceName, logicalRelation.output)
+ }
+
+ private def shouldHaveProperty(
+ tblProperty: java.util.Map[String, String],
+ propertyName: String,
+ sourceTable: CarbonTable) : Unit = {
+ if (!tblProperty.containsKey(propertyName)) {
+ throw new MalformedCarbonCommandException(
+ s"tblproperty '$propertyName' should be provided for stream source " +
+ s"${sourceTable.getDatabaseName}.${sourceTable.getTableName}")
+ }
}
private def getSparkSchema(sourceTable: CarbonTable): StructType = {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index 31c9597..baf4664 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -1770,28 +1770,18 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
|CREATE STREAM stream123 ON TABLE sink
|STMPROPERTIES(
| 'trigger'='ProcessingTime',
- | 'interval'='1 seconds')
- |AS
- | SELECT *
- | FROM source
- | WHERE id % 2 = 1
- """.stripMargin).show(false)
- sql(
- """
- |CREATE STREAM IF NOT EXISTS stream123 ON TABLE sink
- |STMPROPERTIES(
- | 'trigger'='ProcessingTime',
- | 'interval'='1 seconds')
+ | 'interval'='5 seconds')
|AS
| SELECT *
| FROM source
| WHERE id % 2 = 1
""".stripMargin).show(false)
+
Thread.sleep(200)
sql("select * from sink").show
generateCSVDataFile(spark, idStart = 30, rowNums = 10, csvDataDir, SaveMode.Append)
- Thread.sleep(5000)
+ Thread.sleep(7000)
// after 2 minibatch, there should be 10 row added (filter condition: id%2=1)
checkAnswer(sql("select count(*) from sink"), Seq(Row(10)))
@@ -2098,6 +2088,9 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
}
test("StreamSQL: start stream on non-stream table") {
+ sql("DROP TABLE IF EXISTS source")
+ sql("DROP TABLE IF EXISTS sink")
+
sql(
s"""
|CREATE TABLE notsource(
@@ -2143,7 +2136,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
| WHERE id % 2 = 1
""".stripMargin).show(false)
}
- assert(ex.getMessage.contains("Must specify stream source table in the query"))
+ assert(ex.getMessage.contains("Must specify stream source table in the stream query"))
sql("DROP TABLE sink")
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d55c21a..a6679fc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -218,6 +218,12 @@
<scope>${spark.deps.scope}</scope>
</dependency>
<dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
+ <version>${spark.version}</version>
+ <scope>${spark.deps.scope}</scope>
+ </dependency>
+ <dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>${scala.version}</version>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/RowParserImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/RowParserImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/RowParserImpl.java
index 6f7c398..00d8420 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/RowParserImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/RowParserImpl.java
@@ -78,6 +78,9 @@ public class RowParserImpl implements RowParser {
@Override
public Object[] parseRow(Object[] row) {
+ if (row == null) {
+ return new String[numberOfColumns];
+ }
// If number of columns are less in a row then create new array with same size of header.
if (row.length < numberOfColumns) {
String[] temp = new String[numberOfColumns];
http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
index 89f00c9..744915d 100644
--- a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
@@ -419,7 +419,7 @@ public class StreamSegment {
* @return the list of BlockIndex in the index file
* @throws IOException
*/
- private static List<BlockIndex> readIndexFile(String indexPath, FileFactory.FileType fileType)
+ public static List<BlockIndex> readIndexFile(String indexPath, FileFactory.FileType fileType)
throws IOException {
List<BlockIndex> blockIndexList = new ArrayList<>();
CarbonFile index = FileFactory.getCarbonFile(indexPath, fileType);