You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/12/14 04:50:30 UTC

carbondata git commit: [CARBONDATA-1879][Streaming] Support alter table to change the status of the streaming segment

Repository: carbondata
Updated Branches:
  refs/heads/master e5e74fc90 -> 6026680a1


[CARBONDATA-1879][Streaming] Support alter table to change the status of the streaming segment

Support new SQL command to change the status of the segment from 'streaming' to 'streaming finish': Alter table <dbname.tablename> finish streaming

This closes #1638


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/6026680a
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/6026680a
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/6026680a

Branch: refs/heads/master
Commit: 6026680a13c0245fe2140b2fa7af67f1baf639d7
Parents: e5e74fc
Author: QiangCai <qi...@qq.com>
Authored: Sun Dec 10 21:01:56 2017 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Thu Dec 14 12:50:11 2017 +0800

----------------------------------------------------------------------
 .../ThriftWrapperSchemaConverterImpl.java       |  2 +-
 .../hadoop/test/util/StoreCreator.java          |  2 +-
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala |  2 +
 .../CarbonAlterTableFinishStreaming.scala       | 37 +++++++++++
 .../sql/parser/CarbonSpark2SqlParser.scala      | 17 ++++-
 .../TestStreamingTableOperation.scala           | 31 +++++++++
 .../carbondata/processing/StoreCreator.java     |  2 +-
 .../streaming/segment/StreamSegment.java        | 66 ++++++++++++++++++++
 8 files changed, 153 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/6026680a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
index 945a40c..1316a25 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
@@ -635,13 +635,13 @@ public class ThriftWrapperSchemaConverterImpl implements SchemaConverter {
         schemaEvolutionList.get(schemaEvolutionList.size() - 1).getTime_stamp());
     wrapperTableInfo.setDatabaseName(dbName);
     wrapperTableInfo.setTableUniqueName(CarbonTable.buildUniqueName(dbName, tableName));
-    wrapperTableInfo.setTablePath(tablePath);
     wrapperTableInfo.setFactTable(
         fromExternalToWrapperTableSchema(externalTableInfo.getFact_table(), tableName));
     if (null != externalTableInfo.getDataMapSchemas()) {
       wrapperTableInfo.setDataMapSchemaList(
           fromExternalToWrapperChildSchemaList(externalTableInfo.getDataMapSchemas()));
     }
+    wrapperTableInfo.setTablePath(tablePath);
     return wrapperTableInfo;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6026680a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
index ab22945..531bed5 100644
--- a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
+++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
@@ -186,7 +186,6 @@ public class StoreCreator {
   public static CarbonTable createTable(
       AbsoluteTableIdentifier absoluteTableIdentifier) throws IOException {
     TableInfo tableInfo = new TableInfo();
-    tableInfo.setTablePath(absoluteTableIdentifier.getTablePath());
     tableInfo.setDatabaseName(absoluteTableIdentifier.getCarbonTableIdentifier().getDatabaseName());
     TableSchema tableSchema = new TableSchema();
     tableSchema.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName());
@@ -278,6 +277,7 @@ public class StoreCreator {
     );
     tableInfo.setLastUpdatedTime(System.currentTimeMillis());
     tableInfo.setFactTable(tableSchema);
+    tableInfo.setTablePath(absoluteTableIdentifier.getTablePath());
     CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
     String schemaFilePath = carbonTablePath.getSchemaFilePath();
     String schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6026680a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index f405902..9b3d969 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -66,6 +66,8 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
   protected val COLS = carbonKeyWord("COLS")
   protected val COLUMNS = carbonKeyWord("COLUMNS")
   protected val COMPACT = carbonKeyWord("COMPACT")
+  protected val FINISH = carbonKeyWord("FINISH")
+  protected val STREAMING = carbonKeyWord("STREAMING")
   protected val CREATE = carbonKeyWord("CREATE")
   protected val CUBE = carbonKeyWord("CUBE")
   protected val CUBES = carbonKeyWord("CUBES")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6026680a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableFinishStreaming.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableFinishStreaming.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableFinishStreaming.scala
new file mode 100644
index 0000000..ce83815
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableFinishStreaming.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.management
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.execution.command.MetadataCommand
+
+import org.apache.carbondata.streaming.segment.StreamSegment
+
+/**
+ * This command will try to change the status of the segment from "streaming" to "streaming finish"
+ */
+case class CarbonAlterTableFinishStreaming(
+    dbName: Option[String],
+    tableName: String)
+  extends MetadataCommand {
+  override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
+    val carbonTable = CarbonEnv.getCarbonTable(dbName, tableName)(sparkSession)
+    StreamSegment.finishStreaming(carbonTable)
+    Seq.empty
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6026680a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index ffa2d32..dad0e3e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.parser
 import scala.collection.mutable
 import scala.language.implicitConversions
 
-import org.apache.spark.sql.{DeleteRecords, UpdateTable}
+import org.apache.spark.sql.{CarbonEnv, DeleteRecords, UpdateTable}
 import org.apache.spark.sql.catalyst.{CarbonDDLSqlParser, TableIdentifier}
 import org.apache.spark.sql.catalyst.CarbonTableIdentifierImplicit._
 import org.apache.spark.sql.catalyst.plans.logical._
@@ -71,8 +71,8 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
   protected lazy val start: Parser[LogicalPlan] = explainPlan | startCommand
 
   protected lazy val startCommand: Parser[LogicalPlan] =
-    loadManagement|showLoads|alterTable|restructure|updateTable|deleteRecords|
-    alterPartition|datamapManagement
+    loadManagement | showLoads | alterTable | restructure | updateTable | deleteRecords |
+    alterPartition | datamapManagement | alterTableFinishStreaming
 
   protected lazy val loadManagement: Parser[LogicalPlan] =
     deleteLoadsByID | deleteLoadsByLoadDate | cleanFiles | loadDataNew
@@ -130,6 +130,17 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
     }
 
   /**
+   * The below syntax is used to change the status of the segment
+   * from "streaming" to "streaming finish".
+   * ALTER TABLE tableName FINISH STREAMING
+   */
+  protected lazy val alterTableFinishStreaming: Parser[LogicalPlan] =
+    ALTER ~> TABLE ~> (ident <~ ".").? ~ ident <~ FINISH <~ STREAMING <~ opt(";") ^^ {
+      case dbName ~ table =>
+        CarbonAlterTableFinishStreaming(dbName, table)
+    }
+
+  /**
    * The syntax of datamap creation is as follows.
    * CREATE DATAMAP datamapName ON TABLE tableName USING 'DataMapClassName'
    * DMPROPERTIES('KEY'='VALUE') AS SELECT COUNT(COL1) FROM tableName

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6026680a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index ad439f2..12a8b8b 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -111,6 +111,9 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
 
     // 13. handoff streaming segment
     createTable(tableName = "stream_table_handoff", streaming = true, withBatchLoad = false)
+
+    // 14. finish streaming
+    createTable(tableName = "stream_table_finish", streaming = true, withBatchLoad = true)
   }
 
   test("validate streaming property") {
@@ -192,6 +195,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
     sql("drop table if exists streaming.stream_table_delete")
     sql("drop table if exists streaming.stream_table_alter")
     sql("drop table if exists streaming.stream_table_handoff")
+    sql("drop table if exists streaming.stream_table_finish")
   }
 
   // normal table not support streaming ingest
@@ -721,6 +725,33 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
     )
   }
 
+  test("alter table finish streaming") {
+    executeStreamingIngest(
+      tableName = "stream_table_finish",
+      batchNums = 6,
+      rowNumsEachBatch = 10000,
+      intervalOfSource = 5,
+      intervalOfIngest = 10,
+      continueSeconds = 40,
+      generateBadRecords = false,
+      badRecordAction = "force",
+      handoffSize = 1024L * 200
+    )
+    sql("alter table streaming.stream_table_finish finish streaming")
+    sql("show segments for table streaming.stream_table_finish").show(100, false)
+
+    val segments = sql("show segments for table streaming.stream_table_finish").collect()
+    assertResult(4)(segments.length)
+    assertResult("Streaming Finish")(segments(0).getString(1))
+    assertResult("Streaming Finish")(segments(1).getString(1))
+    assertResult("Streaming Finish")(segments(2).getString(1))
+    assertResult("Success")(segments(3).getString(1))
+    checkAnswer(
+      sql("select count(*) from streaming.stream_table_finish"),
+      Seq(Row(5 + 6 * 10000))
+    )
+  }
+
   test("do not support creating datamap on streaming table") {
     assert(
       intercept[MalformedCarbonCommandException](

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6026680a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
----------------------------------------------------------------------
diff --git a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
index 24b7415..e662757 100644
--- a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
+++ b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
@@ -176,7 +176,6 @@ public class StoreCreator {
 
   private static CarbonTable createTable() throws IOException {
     TableInfo tableInfo = new TableInfo();
-    tableInfo.setTablePath(absoluteTableIdentifier.getTablePath());
     tableInfo.setDatabaseName(absoluteTableIdentifier.getCarbonTableIdentifier().getDatabaseName());
     TableSchema tableSchema = new TableSchema();
     tableSchema.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName());
@@ -263,6 +262,7 @@ public class StoreCreator {
     );
     tableInfo.setLastUpdatedTime(System.currentTimeMillis());
     tableInfo.setFactTable(tableSchema);
+    tableInfo.setTablePath(absoluteTableIdentifier.getTablePath());
 
     CarbonTablePath carbonTablePath = CarbonStorePath
         .getCarbonTablePath(absoluteTableIdentifier.getTablePath(),

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6026680a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
index d733a74..7a62183 100644
--- a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
@@ -29,7 +29,9 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.locks.CarbonLockFactory;
 import org.apache.carbondata.core.locks.ICarbonLock;
+import org.apache.carbondata.core.locks.LockUsage;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.reader.CarbonIndexFileReader;
 import org.apache.carbondata.core.statusmanager.FileFormat;
@@ -181,6 +183,70 @@ public class StreamSegment {
   }
 
   /**
+   * change the status of the segment from "streaming" to "streaming finish"
+   */
+  public static void finishStreaming(CarbonTable carbonTable) throws IOException {
+    ICarbonLock streamingLock = CarbonLockFactory.getCarbonLockObj(
+        carbonTable.getTableInfo().getOrCreateAbsoluteTableIdentifier(),
+        LockUsage.STREAMING_LOCK);
+    try {
+      if (streamingLock.lockWithRetries()) {
+        ICarbonLock statusLock = CarbonLockFactory.getCarbonLockObj(
+            carbonTable.getTableInfo().getOrCreateAbsoluteTableIdentifier(),
+            LockUsage.TABLE_STATUS_LOCK);
+        try {
+          if (statusLock.lockWithRetries()) {
+            LoadMetadataDetails[] details =
+                SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath());
+            boolean updated = false;
+            for (LoadMetadataDetails detail : details) {
+              if (SegmentStatus.STREAMING == detail.getSegmentStatus()) {
+                detail.setLoadEndTime(System.currentTimeMillis());
+                detail.setSegmentStatus(SegmentStatus.STREAMING_FINISH);
+                updated = true;
+              }
+            }
+            if (updated) {
+              CarbonTablePath tablePath =
+                  CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier());
+              SegmentStatusManager.writeLoadDetailsIntoFile(
+                  tablePath.getTableStatusFilePath(), details);
+            }
+          } else {
+            String msg = "Failed to acquire table status lock of " +
+                carbonTable.getDatabaseName() + "." + carbonTable.getTableName();
+            LOGGER.error(msg);
+            throw new IOException(msg);
+          }
+        } finally {
+          if (statusLock.unlock()) {
+            LOGGER.info("Table unlocked successfully after table status updation" +
+                carbonTable.getDatabaseName() + "." + carbonTable.getTableName());
+          } else {
+            LOGGER.error("Unable to unlock Table lock for table " +
+                carbonTable.getDatabaseName() + "." + carbonTable.getTableName() +
+                " during table status updation");
+          }
+        }
+      } else {
+        String msg = "Failed to finish streaming, because streaming is locked for table " +
+            carbonTable.getDatabaseName() + "." + carbonTable.getTableName();
+        LOGGER.error(msg);
+        throw new IOException(msg);
+      }
+    } finally {
+      if (streamingLock.unlock()) {
+        LOGGER.info("Table unlocked successfully after streaming finished" + carbonTable
+            .getDatabaseName() + "." + carbonTable.getTableName());
+      } else {
+        LOGGER.error("Unable to unlock Table lock for table " +
+            carbonTable.getDatabaseName() + "." + carbonTable.getTableName() +
+            " during streaming finished");
+      }
+    }
+  }
+
+  /**
    * invoke CarbonStreamOutputFormat to append batch data to existing carbondata file
    */
   public static void appendBatchData(CarbonIterator<Object[]> inputIterators,