You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/12/14 04:50:30 UTC
carbondata git commit: [CARBONDATA-1879][Streaming] Support alter
table to change the status of the streaming segment
Repository: carbondata
Updated Branches:
refs/heads/master e5e74fc90 -> 6026680a1
[CARBONDATA-1879][Streaming] Support alter table to change the status of the streaming segment
Support new SQL command to change the status of the segment from 'streaming' to 'streaming finish': Alter table <dbname.tablename> finish streaming
This closes #1638
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/6026680a
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/6026680a
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/6026680a
Branch: refs/heads/master
Commit: 6026680a13c0245fe2140b2fa7af67f1baf639d7
Parents: e5e74fc
Author: QiangCai <qi...@qq.com>
Authored: Sun Dec 10 21:01:56 2017 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Thu Dec 14 12:50:11 2017 +0800
----------------------------------------------------------------------
.../ThriftWrapperSchemaConverterImpl.java | 2 +-
.../hadoop/test/util/StoreCreator.java | 2 +-
.../spark/sql/catalyst/CarbonDDLSqlParser.scala | 2 +
.../CarbonAlterTableFinishStreaming.scala | 37 +++++++++++
.../sql/parser/CarbonSpark2SqlParser.scala | 17 ++++-
.../TestStreamingTableOperation.scala | 31 +++++++++
.../carbondata/processing/StoreCreator.java | 2 +-
.../streaming/segment/StreamSegment.java | 66 ++++++++++++++++++++
8 files changed, 153 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6026680a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
index 945a40c..1316a25 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
@@ -635,13 +635,13 @@ public class ThriftWrapperSchemaConverterImpl implements SchemaConverter {
schemaEvolutionList.get(schemaEvolutionList.size() - 1).getTime_stamp());
wrapperTableInfo.setDatabaseName(dbName);
wrapperTableInfo.setTableUniqueName(CarbonTable.buildUniqueName(dbName, tableName));
- wrapperTableInfo.setTablePath(tablePath);
wrapperTableInfo.setFactTable(
fromExternalToWrapperTableSchema(externalTableInfo.getFact_table(), tableName));
if (null != externalTableInfo.getDataMapSchemas()) {
wrapperTableInfo.setDataMapSchemaList(
fromExternalToWrapperChildSchemaList(externalTableInfo.getDataMapSchemas()));
}
+ wrapperTableInfo.setTablePath(tablePath);
return wrapperTableInfo;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6026680a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
index ab22945..531bed5 100644
--- a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
+++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
@@ -186,7 +186,6 @@ public class StoreCreator {
public static CarbonTable createTable(
AbsoluteTableIdentifier absoluteTableIdentifier) throws IOException {
TableInfo tableInfo = new TableInfo();
- tableInfo.setTablePath(absoluteTableIdentifier.getTablePath());
tableInfo.setDatabaseName(absoluteTableIdentifier.getCarbonTableIdentifier().getDatabaseName());
TableSchema tableSchema = new TableSchema();
tableSchema.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName());
@@ -278,6 +277,7 @@ public class StoreCreator {
);
tableInfo.setLastUpdatedTime(System.currentTimeMillis());
tableInfo.setFactTable(tableSchema);
+ tableInfo.setTablePath(absoluteTableIdentifier.getTablePath());
CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
String schemaFilePath = carbonTablePath.getSchemaFilePath();
String schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6026680a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index f405902..9b3d969 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -66,6 +66,8 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
protected val COLS = carbonKeyWord("COLS")
protected val COLUMNS = carbonKeyWord("COLUMNS")
protected val COMPACT = carbonKeyWord("COMPACT")
+ protected val FINISH = carbonKeyWord("FINISH")
+ protected val STREAMING = carbonKeyWord("STREAMING")
protected val CREATE = carbonKeyWord("CREATE")
protected val CUBE = carbonKeyWord("CUBE")
protected val CUBES = carbonKeyWord("CUBES")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6026680a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableFinishStreaming.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableFinishStreaming.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableFinishStreaming.scala
new file mode 100644
index 0000000..ce83815
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableFinishStreaming.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.management
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.execution.command.MetadataCommand
+
+import org.apache.carbondata.streaming.segment.StreamSegment
+
+/**
+ * This command will try to change the status of the segment from "streaming" to "streaming finish"
+ */
+case class CarbonAlterTableFinishStreaming(
+ dbName: Option[String],
+ tableName: String)
+ extends MetadataCommand {
+ override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
+ val carbonTable = CarbonEnv.getCarbonTable(dbName, tableName)(sparkSession)
+ StreamSegment.finishStreaming(carbonTable)
+ Seq.empty
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6026680a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index ffa2d32..dad0e3e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.parser
import scala.collection.mutable
import scala.language.implicitConversions
-import org.apache.spark.sql.{DeleteRecords, UpdateTable}
+import org.apache.spark.sql.{CarbonEnv, DeleteRecords, UpdateTable}
import org.apache.spark.sql.catalyst.{CarbonDDLSqlParser, TableIdentifier}
import org.apache.spark.sql.catalyst.CarbonTableIdentifierImplicit._
import org.apache.spark.sql.catalyst.plans.logical._
@@ -71,8 +71,8 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
protected lazy val start: Parser[LogicalPlan] = explainPlan | startCommand
protected lazy val startCommand: Parser[LogicalPlan] =
- loadManagement|showLoads|alterTable|restructure|updateTable|deleteRecords|
- alterPartition|datamapManagement
+ loadManagement | showLoads | alterTable | restructure | updateTable | deleteRecords |
+ alterPartition | datamapManagement | alterTableFinishStreaming
protected lazy val loadManagement: Parser[LogicalPlan] =
deleteLoadsByID | deleteLoadsByLoadDate | cleanFiles | loadDataNew
@@ -130,6 +130,17 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
}
/**
+ * The below syntax is used to change the status of the segment
+ * from "streaming" to "streaming finish".
+ * ALTER TABLE tableName FINISH STREAMING
+ */
+ protected lazy val alterTableFinishStreaming: Parser[LogicalPlan] =
+ ALTER ~> TABLE ~> (ident <~ ".").? ~ ident <~ FINISH <~ STREAMING <~ opt(";") ^^ {
+ case dbName ~ table =>
+ CarbonAlterTableFinishStreaming(dbName, table)
+ }
+
+ /**
* The syntax of datamap creation is as follows.
* CREATE DATAMAP datamapName ON TABLE tableName USING 'DataMapClassName'
* DMPROPERTIES('KEY'='VALUE') AS SELECT COUNT(COL1) FROM tableName
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6026680a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index ad439f2..12a8b8b 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -111,6 +111,9 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
// 13. handoff streaming segment
createTable(tableName = "stream_table_handoff", streaming = true, withBatchLoad = false)
+
+ // 14. finish streaming
+ createTable(tableName = "stream_table_finish", streaming = true, withBatchLoad = true)
}
test("validate streaming property") {
@@ -192,6 +195,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
sql("drop table if exists streaming.stream_table_delete")
sql("drop table if exists streaming.stream_table_alter")
sql("drop table if exists streaming.stream_table_handoff")
+ sql("drop table if exists streaming.stream_table_finish")
}
// normal table not support streaming ingest
@@ -721,6 +725,33 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
)
}
+ test("alter table finish streaming") {
+ executeStreamingIngest(
+ tableName = "stream_table_finish",
+ batchNums = 6,
+ rowNumsEachBatch = 10000,
+ intervalOfSource = 5,
+ intervalOfIngest = 10,
+ continueSeconds = 40,
+ generateBadRecords = false,
+ badRecordAction = "force",
+ handoffSize = 1024L * 200
+ )
+ sql("alter table streaming.stream_table_finish finish streaming")
+ sql("show segments for table streaming.stream_table_finish").show(100, false)
+
+ val segments = sql("show segments for table streaming.stream_table_finish").collect()
+ assertResult(4)(segments.length)
+ assertResult("Streaming Finish")(segments(0).getString(1))
+ assertResult("Streaming Finish")(segments(1).getString(1))
+ assertResult("Streaming Finish")(segments(2).getString(1))
+ assertResult("Success")(segments(3).getString(1))
+ checkAnswer(
+ sql("select count(*) from streaming.stream_table_finish"),
+ Seq(Row(5 + 6 * 10000))
+ )
+ }
+
test("do not support creating datamap on streaming table") {
assert(
intercept[MalformedCarbonCommandException](
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6026680a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
----------------------------------------------------------------------
diff --git a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
index 24b7415..e662757 100644
--- a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
+++ b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
@@ -176,7 +176,6 @@ public class StoreCreator {
private static CarbonTable createTable() throws IOException {
TableInfo tableInfo = new TableInfo();
- tableInfo.setTablePath(absoluteTableIdentifier.getTablePath());
tableInfo.setDatabaseName(absoluteTableIdentifier.getCarbonTableIdentifier().getDatabaseName());
TableSchema tableSchema = new TableSchema();
tableSchema.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName());
@@ -263,6 +262,7 @@ public class StoreCreator {
);
tableInfo.setLastUpdatedTime(System.currentTimeMillis());
tableInfo.setFactTable(tableSchema);
+ tableInfo.setTablePath(absoluteTableIdentifier.getTablePath());
CarbonTablePath carbonTablePath = CarbonStorePath
.getCarbonTablePath(absoluteTableIdentifier.getTablePath(),
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6026680a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
index d733a74..7a62183 100644
--- a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
@@ -29,7 +29,9 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.locks.CarbonLockFactory;
import org.apache.carbondata.core.locks.ICarbonLock;
+import org.apache.carbondata.core.locks.LockUsage;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.reader.CarbonIndexFileReader;
import org.apache.carbondata.core.statusmanager.FileFormat;
@@ -181,6 +183,70 @@ public class StreamSegment {
}
/**
+ * change the status of the segment from "streaming" to "streaming finish"
+ */
+ public static void finishStreaming(CarbonTable carbonTable) throws IOException {
+ ICarbonLock streamingLock = CarbonLockFactory.getCarbonLockObj(
+ carbonTable.getTableInfo().getOrCreateAbsoluteTableIdentifier(),
+ LockUsage.STREAMING_LOCK);
+ try {
+ if (streamingLock.lockWithRetries()) {
+ ICarbonLock statusLock = CarbonLockFactory.getCarbonLockObj(
+ carbonTable.getTableInfo().getOrCreateAbsoluteTableIdentifier(),
+ LockUsage.TABLE_STATUS_LOCK);
+ try {
+ if (statusLock.lockWithRetries()) {
+ LoadMetadataDetails[] details =
+ SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath());
+ boolean updated = false;
+ for (LoadMetadataDetails detail : details) {
+ if (SegmentStatus.STREAMING == detail.getSegmentStatus()) {
+ detail.setLoadEndTime(System.currentTimeMillis());
+ detail.setSegmentStatus(SegmentStatus.STREAMING_FINISH);
+ updated = true;
+ }
+ }
+ if (updated) {
+ CarbonTablePath tablePath =
+ CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier());
+ SegmentStatusManager.writeLoadDetailsIntoFile(
+ tablePath.getTableStatusFilePath(), details);
+ }
+ } else {
+ String msg = "Failed to acquire table status lock of " +
+ carbonTable.getDatabaseName() + "." + carbonTable.getTableName();
+ LOGGER.error(msg);
+ throw new IOException(msg);
+ }
+ } finally {
+ if (statusLock.unlock()) {
+ LOGGER.info("Table unlocked successfully after table status updation" +
+ carbonTable.getDatabaseName() + "." + carbonTable.getTableName());
+ } else {
+ LOGGER.error("Unable to unlock Table lock for table " +
+ carbonTable.getDatabaseName() + "." + carbonTable.getTableName() +
+ " during table status updation");
+ }
+ }
+ } else {
+ String msg = "Failed to finish streaming, because streaming is locked for table " +
+ carbonTable.getDatabaseName() + "." + carbonTable.getTableName();
+ LOGGER.error(msg);
+ throw new IOException(msg);
+ }
+ } finally {
+ if (streamingLock.unlock()) {
+ LOGGER.info("Table unlocked successfully after streaming finished" + carbonTable
+ .getDatabaseName() + "." + carbonTable.getTableName());
+ } else {
+ LOGGER.error("Unable to unlock Table lock for table " +
+ carbonTable.getDatabaseName() + "." + carbonTable.getTableName() +
+ " during streaming finished");
+ }
+ }
+ }
+
+ /**
* invoke CarbonStreamOutputFormat to append batch data to existing carbondata file
*/
public static void appendBatchData(CarbonIterator<Object[]> inputIterators,