You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2020/07/10 10:28:05 UTC

[carbondata] branch master updated: [CARBONDATA-3834] Fix the partition table issues while 'carbon.merge.index.in.segment' is false

This is an automated email from the ASF dual-hosted git repository.

qiangcai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 6a20e9f  [CARBONDATA-3834] Fix the partition table issues while 'carbon.merge.index.in.segment' is false
6a20e9f is described below

commit 6a20e9f5e5a98f185cd213de1799d8d7cc0dc1ef
Author: Venu Reddy <k....@gmail.com>
AuthorDate: Wed May 27 19:00:56 2020 +0530

    [CARBONDATA-3834] Fix the partition table issues while 'carbon.merge.index.in.segment' is false
    
    Why is this PR needed?
    Segment directory and the segment file in metadata are not created for partitioned table when 'carbon.merge.index.in.segment' property is set to false. And actual index files which were present in respective partition's '.tmp' directory are also deleted without moving them out to respective partition directory where its '.carbondata' file exist.
    All the queries fail due to this problem as there is no segment and index files.
    
    This issue was introduced from the resolution of an older optimization PR #3535
    
    What changes were proposed in this PR?
    If 'carbon.merge.index.in.segment' property is false, we can create the segment directory and segment file, and move the index file from respective partition's temp directory to partition directory where the .carbondata file exists.
    when carbon.timeseries.first.day.of.week property is configured with invalid value, null pointer exception is thrown instead of using default value.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    Yes
    
    This closes #3776
---
 .../carbondata/core/metadata/SegmentFileStore.java | 52 +++++++++++++++
 .../carbondata/core/preagg/TimeSeriesUDF.java      |  6 +-
 .../hadoop/api/CarbonOutputCommitter.java          | 23 ++++++-
 .../datasources/SparkCarbonTableFormat.scala       | 74 +++++++++++++++++-----
 .../testsuite/iud/DeleteCarbonTableTestCase.scala  | 24 +++++++
 .../testsuite/iud/UpdateCarbonTableTestCase.scala  | 24 +++++++
 .../StandardPartitionTableLoadingTestCase.scala    | 19 ++++++
 7 files changed, 202 insertions(+), 20 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index 2f274c3..545d5b6 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -28,11 +28,13 @@ import java.io.OutputStreamWriter;
 import java.io.Serializable;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 
 import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -105,6 +107,56 @@ public class SegmentFileStore {
   }
 
   /**
+   * Method to create and write the segment file, removes the temporary directories from all the
+   * respective partition directories. This method is invoked only when {@link
+   * CarbonCommonConstants#CARBON_MERGE_INDEX_IN_SEGMENT} is disabled.
+   * @param tablePath Table path
+   * @param segmentId Segment id
+   * @param timeStamp FactTimeStamp
+   * @param partitionNames Partition names list
+   * @param indexFileNames Index files map with partition as key and index file names set as value
+   * @throws IOException
+   */
+  public static void writeSegmentFile(String tablePath, String segmentId, String timeStamp,
+      List<String> partitionNames, Map<String, Set<String>> indexFileNames) throws IOException {
+    SegmentFileStore.SegmentFile finalSegmentFile = null;
+    boolean isRelativePath;
+    String partitionLoc;
+    for (String partition : partitionNames) {
+      isRelativePath = false;
+      partitionLoc = partition;
+      if (partitionLoc.startsWith(tablePath)) {
+        partitionLoc = partitionLoc.substring(tablePath.length());
+        isRelativePath = true;
+      }
+      SegmentFileStore.SegmentFile segmentFile = new SegmentFileStore.SegmentFile();
+      SegmentFileStore.FolderDetails folderDetails = new SegmentFileStore.FolderDetails();
+      folderDetails.setFiles(indexFileNames.get(partition));
+      folderDetails.setPartitions(
+          Collections.singletonList(partitionLoc.substring(partitionLoc.indexOf("/") + 1)));
+      folderDetails.setRelative(isRelativePath);
+      folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage());
+      segmentFile.getLocationMap().put(partitionLoc, folderDetails);
+      if (finalSegmentFile != null) {
+        finalSegmentFile = finalSegmentFile.merge(segmentFile);
+      } else {
+        finalSegmentFile = segmentFile;
+      }
+    }
+    Objects.requireNonNull(finalSegmentFile);
+    String segmentFilesLocation = CarbonTablePath.getSegmentFilesLocation(tablePath);
+    CarbonFile locationFile = FileFactory.getCarbonFile(segmentFilesLocation);
+    if (!locationFile.exists()) {
+      locationFile.mkdirs();
+    }
+    String segmentFileName = SegmentFileStore.genSegmentFileName(segmentId, timeStamp);
+    SegmentFileStore.writeSegmentFile(finalSegmentFile,
+        segmentFilesLocation + "/" + segmentFileName + CarbonTablePath.SEGMENT_EXT);
+    SegmentFileStore
+        .moveFromTempFolder(finalSegmentFile, segmentId + "_" + timeStamp + ".tmp", tablePath);
+  }
+
+  /**
    * Write segment information to the segment folder with indexfilename and
    * corresponding partitions.
    */
diff --git a/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesUDF.java b/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesUDF.java
index df03bcc..36c0260 100644
--- a/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesUDF.java
+++ b/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesUDF.java
@@ -176,9 +176,9 @@ public class TimeSeriesUDF {
     } catch (IllegalArgumentException ex) {
       LOGGER.warn("Invalid value set for first of the week. Considering the default value as: "
           + CarbonCommonConstants.CARBON_TIMESERIES_FIRST_DAY_OF_WEEK_DEFAULT);
-      firstDayOfWeek = DaysOfWeekEnum.valueOf(CarbonProperties.getInstance()
-          .getProperty(CarbonCommonConstants.CARBON_TIMESERIES_FIRST_DAY_OF_WEEK_DEFAULT)
-          .toUpperCase()).getOrdinal();
+      firstDayOfWeek =
+          DaysOfWeekEnum.valueOf(CarbonCommonConstants.CARBON_TIMESERIES_FIRST_DAY_OF_WEEK_DEFAULT)
+              .getOrdinal();
     }
     calanderThreadLocal.get().setFirstDayOfWeek(firstDayOfWeek);
   }
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
index 4b8fc43..6aa3067 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -40,6 +41,7 @@ import org.apache.carbondata.core.statusmanager.SegmentStatus;
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonSessionInfo;
+import org.apache.carbondata.core.util.ObjectSerializationUtil;
 import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.events.OperationContext;
@@ -254,7 +256,8 @@ public class CarbonOutputCommitter extends FileOutputCommitter {
   private void commitJobForPartition(JobContext context, boolean overwriteSet,
       CarbonLoadModel loadModel, String partitionPath) throws IOException {
     String size = context.getConfiguration().get("carbon.datasize", "");
-    if (size.equalsIgnoreCase("0")) {
+    String indexSize = context.getConfiguration().get("carbon.indexsize", "");
+    if (size.equalsIgnoreCase("0") || indexSize.equalsIgnoreCase("0")) {
       CarbonLoaderUtil.updateTableStatusForFailure(loadModel);
       return;
     }
@@ -270,6 +273,18 @@ public class CarbonOutputCommitter extends FileOutputCommitter {
       uuid = operationContext.getProperty("uuid").toString();
     }
     String tempFolderPath = loadModel.getSegmentId() + "_" + loadModel.getFactTimeStamp() + ".tmp";
+    boolean isMergeIndex = Boolean.parseBoolean(CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT,
+            CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT));
+    if (!isMergeIndex) {
+      Map<String, Set<String>> indexFileNameMap = (Map<String, Set<String>>) ObjectSerializationUtil
+          .convertStringToObject(context.getConfiguration().get("carbon.index.files.name"));
+      List<String> partitionList =
+          (List<String>) ObjectSerializationUtil.convertStringToObject(partitionPath);
+      SegmentFileStore.writeSegmentFile(loadModel.getTablePath(), loadModel.getSegmentId(),
+          String.valueOf(loadModel.getFactTimeStamp()), partitionList, indexFileNameMap);
+      tempFolderPath = null;
+    }
     if (operationContext != null) {
       operationContext.setProperty("partitionPath", partitionPath);
       operationContext.setProperty("tempPath", tempFolderPath);
@@ -288,7 +303,11 @@ public class CarbonOutputCommitter extends FileOutputCommitter {
     String segmentFileName = SegmentFileStore.genSegmentFileName(
         loadModel.getSegmentId(), String.valueOf(loadModel.getFactTimeStamp()));
     newMetaEntry.setSegmentFile(segmentFileName + CarbonTablePath.SEGMENT_EXT);
-    newMetaEntry.setIndexSize("" + loadModel.getMetrics().getMergeIndexSize());
+    if (isMergeIndex) {
+      newMetaEntry.setIndexSize("" + loadModel.getMetrics().getMergeIndexSize());
+    } else if (!StringUtils.isEmpty(indexSize)) {
+      newMetaEntry.setIndexSize(indexSize);
+    }
     if (!StringUtils.isEmpty(size)) {
       newMetaEntry.setDataSize(size);
     }
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
index 225daf5..d9c2633 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
@@ -254,6 +254,8 @@ case class CarbonSQLHadoopMapReduceCommitProtocol(jobId: String, path: String, i
       taskCommits: Seq[TaskCommitMessage]): Unit = {
     if (isCarbonDataFlow(jobContext.getConfiguration)) {
       var dataSize = 0L
+      var indexLen = 0L
+      val indexFileNameMap = new util.HashMap[String, util.Set[String]]()
       val partitions =
         taskCommits
           .flatMap { taskCommit =>
@@ -264,6 +266,24 @@ case class CarbonSQLHadoopMapReduceCommitProtocol(jobId: String, path: String, i
                 if (size.isDefined) {
                   dataSize = dataSize + java.lang.Long.parseLong(size.get)
                 }
+                val indexSize = map.get("carbon.indexsize")
+                if (indexSize.isDefined) {
+                  indexLen = indexLen + java.lang.Long.parseLong(indexSize.get)
+                }
+                val indexFiles = map.get("carbon.index.files.name")
+                if (indexFiles.isDefined) {
+                  val indexMap = ObjectSerializationUtil
+                    .convertStringToObject(indexFiles.get)
+                    .asInstanceOf[util.HashMap[String, Set[String]]]
+                  indexMap.asScala.foreach { e =>
+                    var values: util.Set[String] = indexFileNameMap.get(e._1)
+                    if (values == null) {
+                      values = new util.HashSet[String]()
+                      indexFileNameMap.put(e._1, values)
+                    }
+                    values.addAll(e._2.asInstanceOf[util.Set[String]])
+                  }
+                }
                 if (partition.isDefined) {
                   ObjectSerializationUtil
                     .convertStringToObject(partition.get)
@@ -283,13 +303,18 @@ case class CarbonSQLHadoopMapReduceCommitProtocol(jobId: String, path: String, i
         "carbon.output.partitions.name",
         ObjectSerializationUtil.convertObjectToString(partitions))
       jobContext.getConfiguration.set("carbon.datasize", dataSize.toString)
-
+      jobContext.getConfiguration.set("carbon.indexsize", indexLen.toString)
+      jobContext.getConfiguration
+        .set("carbon.index.files.name",
+          ObjectSerializationUtil.convertObjectToString(indexFileNameMap))
       val newTaskCommits = taskCommits.map { taskCommit =>
         taskCommit.obj match {
           case (map: Map[String, String], set) =>
             new TaskCommitMessage(
-              map
-                .filterNot(e => "carbon.partitions".equals(e._1) || "carbon.datasize".equals(e._1)),
+              map.filterNot(e => "carbon.partitions".equals(e._1) ||
+                                 "carbon.datasize".equals(e._1) ||
+                                 "carbon.indexsize".equals(e._1) ||
+                                 "carbon.index.files.name".equals(e._1)),
               set)
           case _ => taskCommit
         }
@@ -313,27 +338,46 @@ case class CarbonSQLHadoopMapReduceCommitProtocol(jobId: String, path: String, i
       ThreadLocalSessionInfo.unsetAll()
       val partitions: String = taskContext.getConfiguration.get("carbon.output.partitions.name", "")
       val files = taskContext.getConfiguration.get("carbon.output.files.name", "")
+      val indexFileNameMap = new util.HashMap[String, util.Set[String]]()
       var sum = 0L
       var indexSize = 0L
-      if (!StringUtils.isEmpty(files)) {
-        val filesList = ObjectSerializationUtil
-          .convertStringToObject(files)
+      if (!StringUtils.isEmpty(partitions)) {
+        val partitionList = ObjectSerializationUtil
+          .convertStringToObject(partitions)
           .asInstanceOf[util.ArrayList[String]]
           .asScala
-        for (file <- filesList) {
-          if (file.contains(".carbondata")) {
-            sum += java.lang.Long.parseLong(file.substring(file.lastIndexOf(":") + 1))
-          } else if (file.contains(".carbonindex")) {
-            indexSize += java.lang.Long.parseLong(file.substring(file.lastIndexOf(":") + 1))
+        if (!StringUtils.isEmpty(files)) {
+          val filesList = ObjectSerializationUtil
+            .convertStringToObject(files)
+            .asInstanceOf[util.ArrayList[String]]
+            .asScala
+          for (file <- filesList) {
+            if (file.contains(".carbondata")) {
+              sum += java.lang.Long.parseLong(file.substring(file.lastIndexOf(":") + 1))
+            } else if (file.contains(".carbonindex")) {
+              val fileOffset = file.lastIndexOf(":")
+              indexSize += java.lang.Long.parseLong(file.substring(fileOffset + 1))
+              val absoluteFileName = file.substring(0, fileOffset)
+              val indexFileNameOffset = absoluteFileName.lastIndexOf("/")
+              val indexFileName = absoluteFileName.substring(indexFileNameOffset + 1)
+              val matchedPartition = partitionList.find(absoluteFileName.startsWith)
+              var values: util.Set[String] = indexFileNameMap.get(matchedPartition.get)
+              if (values == null) {
+                values = new util.HashSet[String]()
+                indexFileNameMap.put(matchedPartition.get, values)
+              }
+              values.add(indexFileName)
+            }
           }
         }
-      }
-      if (!StringUtils.isEmpty(partitions)) {
+        val indexFileNames = ObjectSerializationUtil.convertObjectToString(indexFileNameMap)
         taskMsg = taskMsg.obj match {
           case (map: Map[String, String], set) =>
             new TaskCommitMessage(
-              map ++ Map("carbon.partitions" -> partitions, "carbon.datasize" -> sum.toString),
-              set)
+              map ++ Map("carbon.partitions" -> partitions,
+                "carbon.datasize" -> sum.toString,
+                "carbon.indexsize" -> indexSize.toString,
+                "carbon.index.files.name" -> indexFileNames), set)
           case _ => taskMsg
         }
       }
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
index b5580c6..1619dfc 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
@@ -147,6 +147,30 @@ class DeleteCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
     )
   }
 
+  test("test delete for partition table without merge index files for segment") {
+    try {
+      sql("DROP TABLE IF EXISTS iud_db.partition_nomerge_index")
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "false")
+      sql(
+        s"""CREATE TABLE iud_db.partition_nomerge_index (a INT, b INT) PARTITIONED BY (country
+           |STRING) STORED AS carbondata"""
+          .stripMargin)
+      sql("INSERT INTO iud_db.partition_nomerge_index  PARTITION(country='India') SELECT 1,2")
+      sql("INSERT INTO iud_db.partition_nomerge_index  PARTITION(country='India') SELECT 3,4")
+      sql("INSERT INTO iud_db.partition_nomerge_index  PARTITION(country='China') SELECT 5,6")
+      sql("INSERT INTO iud_db.partition_nomerge_index  PARTITION(country='China') SELECT 7,8")
+      checkAnswer(sql("select * from iud_db.partition_nomerge_index"),
+        Seq(Row(1, 2, "India"), Row(3, 4, "India"), Row(5, 6, "China"), Row(7, 8, "China")))
+      sql("DELETE FROM iud_db.partition_nomerge_index WHERE b = 4")
+      checkAnswer(sql("select * from iud_db.partition_nomerge_index"),
+        Seq(Row(1, 2, "India"), Row(5, 6, "China"), Row(7, 8, "China")))
+    } finally {
+      CarbonProperties.getInstance()
+        .removeProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT)
+    }
+  }
+
   test("Records more than one pagesize after delete operation ") {
     sql("DROP TABLE IF EXISTS carbon2")
     import sqlContext.implicits._
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
index 5fc15d8..2f455af 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
@@ -891,6 +891,30 @@ class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
     sql("drop table if exists test_return_row_count_source")
   }
 
+  test("test update for partition table without merge index files for segment") {
+    try {
+      sql("DROP TABLE IF EXISTS iud.partition_nomerge_index")
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "false")
+      sql(
+        s"""CREATE TABLE iud.partition_nomerge_index (a INT, b INT) PARTITIONED BY (country
+           |STRING) STORED AS carbondata"""
+          .stripMargin)
+      sql("INSERT INTO iud.partition_nomerge_index  PARTITION(country='India') SELECT 1,2")
+      sql("INSERT INTO iud.partition_nomerge_index  PARTITION(country='India') SELECT 3,4")
+      sql("INSERT INTO iud.partition_nomerge_index  PARTITION(country='China') SELECT 5,6")
+      sql("INSERT INTO iud.partition_nomerge_index  PARTITION(country='China') SELECT 7,8")
+      checkAnswer(sql("select * from iud.partition_nomerge_index"),
+        Seq(Row(1, 2, "India"), Row(3, 4, "India"), Row(5, 6, "China"), Row(7, 8, "China")))
+      sql("UPDATE iud.partition_nomerge_index SET (b)=(1)")
+      checkAnswer(sql("select * from iud.partition_nomerge_index"),
+        Seq(Row(1, 1, "India"), Row(3, 1, "India"), Row(5, 1, "China"), Row(7, 1, "China")))
+    } finally {
+      CarbonProperties.getInstance()
+        .removeProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT)
+    }
+  }
+
   override def afterAll {
     sql("use default")
     sql("drop database  if exists iud cascade")
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
index e23d375..5e2acaa 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
@@ -566,6 +566,25 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte
     assert(ex.getMessage().equalsIgnoreCase("Cannot use all columns for partition columns;"))
   }
 
+  test("test partition without merge index files for segment") {
+    try {
+      sql("DROP TABLE IF EXISTS new_par")
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "false")
+      sql(
+        s"""CREATE TABLE new_par (a INT, b INT) PARTITIONED BY (country STRING) STORED AS
+           |carbondata""".stripMargin)
+      sql("INSERT INTO new_par PARTITION(country='India') SELECT 1,2")
+      sql("INSERT INTO new_par PARTITION(country='India') SELECT 3,4")
+      sql("INSERT INTO new_par PARTITION(country='China') SELECT 5,6")
+      sql("INSERT INTO new_par PARTITION(country='China') SELECT 7,8")
+      checkAnswer(sql("SELECT COUNT(*) FROM new_par"), Seq(Row(4)))
+    } finally {
+      CarbonProperties.getInstance()
+        .removeProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT)
+    }
+  }
+
   def verifyInsertForPartitionTable(tableName: String, sort_scope: String): Unit = {
     sql(s"drop table if exists $tableName")
     sql(