You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/08/24 06:10:34 UTC

carbondata git commit: [CARBONDATA-1342] Fixed bugs for spark conf property and debugging in windows

Repository: carbondata
Updated Branches:
  refs/heads/master af5fdcb92 -> 2a205a554


[CARBONDATA-1342] Fixed bugs for spark conf property and debugging in windows

Fixes include:

In spark 2, spark conf once set in spark context cannot be modified with the same context again. Therefore removed setting property in spark conf and directly getting the property from carbon properties.
Fixed bug of running CarbonSessionExample in windows by removing File.Separator
Removed call of namenode after completion of each data load for renaming bad folders

This closes#1213


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/2a205a55
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/2a205a55
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/2a205a55

Branch: refs/heads/master
Commit: 2a205a5546692b3fc78ce9e3d51ba085da5d7000
Parents: af5fdcb
Author: manishgupta88 <to...@gmail.com>
Authored: Sun Jul 30 16:12:18 2017 +0800
Committer: Ravindra Pesala <ra...@gmail.com>
Committed: Thu Aug 24 11:40:08 2017 +0530

----------------------------------------------------------------------
 .../carbondata/core/locks/ZooKeeperLocking.java |  7 +++---
 .../core/metadata/AbsoluteTableIdentifier.java  |  6 ++---
 .../carbondata/spark/util/CommonUtil.scala      | 10 +++-----
 .../steps/DataConverterProcessorStepImpl.java   | 25 ++++++++++++--------
 .../csvbased/BadRecordsLogger.java              |  8 +++++++
 5 files changed, 32 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a205a55/core/src/main/java/org/apache/carbondata/core/locks/ZooKeeperLocking.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/locks/ZooKeeperLocking.java b/core/src/main/java/org/apache/carbondata/core/locks/ZooKeeperLocking.java
index 5e83aa4..256c059 100644
--- a/core/src/main/java/org/apache/carbondata/core/locks/ZooKeeperLocking.java
+++ b/core/src/main/java/org/apache/carbondata/core/locks/ZooKeeperLocking.java
@@ -17,7 +17,6 @@
 
 package org.apache.carbondata.core.locks;
 
-import java.io.File;
 import java.util.Collections;
 import java.util.List;
 
@@ -70,8 +69,8 @@ public class ZooKeeperLocking extends AbstractCarbonLock {
   private String lockTypeFolder;
 
   public ZooKeeperLocking(CarbonTableIdentifier tableIdentifier, String lockFile) {
-    this(tableIdentifier.getDatabaseName() + File.separator + tableIdentifier.getTableName(),
-        lockFile);
+    this(tableIdentifier.getDatabaseName() + CarbonCommonConstants.FILE_SEPARATOR + tableIdentifier
+        .getTableName(), lockFile);
   }
 
   public static void initialize() {
@@ -129,7 +128,7 @@ public class ZooKeeperLocking extends AbstractCarbonLock {
   private void createRecursivly(String path) throws KeeperException, InterruptedException {
     try {
       if (zk.exists(path, true) == null && path.length() > 0) {
-        String temp = path.substring(0, path.lastIndexOf(File.separator));
+        String temp = path.substring(0, path.lastIndexOf(CarbonCommonConstants.FILE_SEPARATOR));
         createRecursivly(temp);
         zk.create(path, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
       } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a205a55/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java b/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java
index 31ad03b..1fbf544 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java
@@ -16,7 +16,6 @@
  */
 package org.apache.carbondata.core.metadata;
 
-import java.io.File;
 import java.io.Serializable;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -99,8 +98,9 @@ public class AbsoluteTableIdentifier implements Serializable {
   }
 
   public String getTablePath() {
-    return getStorePath() + File.separator + getCarbonTableIdentifier().getDatabaseName() +
-        File.separator + getCarbonTableIdentifier().getTableName();
+    return getStorePath() + CarbonCommonConstants.FILE_SEPARATOR + getCarbonTableIdentifier()
+        .getDatabaseName() + CarbonCommonConstants.FILE_SEPARATOR + getCarbonTableIdentifier()
+        .getTableName();
   }
 
   public String appendWithLocalPrefix(String path) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a205a55/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index 4d781a1..6f5c85b 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -713,13 +713,9 @@ object CommonUtil {
    * @param sparkContext
    */
   def cleanInProgressSegments(storePath: String, sparkContext: SparkContext): Unit = {
-    val prop = CarbonProperties.getInstance().
-      getProperty(CarbonCommonConstants.DATA_MANAGEMENT_DRIVER)
-    if (prop != null) {
-      sparkContext.getConf.set(CarbonCommonConstants.DATA_MANAGEMENT_DRIVER, prop)
-    }
-    val loaderDriver = sparkContext.getConf.get(CarbonCommonConstants.DATA_MANAGEMENT_DRIVER,
-      CarbonCommonConstants.DATA_MANAGEMENT_DRIVER_DEFAULT).toBoolean
+    val loaderDriver = CarbonProperties.getInstance().
+      getProperty(CarbonCommonConstants.DATA_MANAGEMENT_DRIVER,
+        CarbonCommonConstants.DATA_MANAGEMENT_DRIVER_DEFAULT).toBoolean
     if (!loaderDriver) {
       return
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a205a55/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
index c46ea25..de96cc9 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
@@ -180,7 +180,7 @@ public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorSte
     if (!closed) {
       if (null != badRecordLogger) {
         badRecordLogger.closeStreams();
-        renameBadRecord(configuration);
+        renameBadRecord(badRecordLogger, configuration);
       }
       super.close();
       if (converters != null) {
@@ -195,21 +195,26 @@ public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorSte
       configuration, RowConverter converter) {
     if (badRecordLogger != null) {
       badRecordLogger.closeStreams();
-      renameBadRecord(configuration);
+      renameBadRecord(badRecordLogger, configuration);
     }
     if (converter != null) {
       converter.finish();
     }
   }
 
-  private static void renameBadRecord(CarbonDataLoadConfiguration configuration) {
-    // rename the bad record in progress to normal
-    CarbonTableIdentifier identifier =
-        configuration.getTableIdentifier().getCarbonTableIdentifier();
-    CarbonDataProcessorUtil.renameBadRecordsFromInProgressToNormal(configuration,
-        identifier.getDatabaseName() + File.separator + identifier.getTableName()
-            + File.separator + configuration.getSegmentId() + File.separator + configuration
-            .getTaskNo());
+  private static void renameBadRecord(BadRecordsLogger badRecordLogger,
+      CarbonDataLoadConfiguration configuration) {
+    // rename operation should be performed only in case either bad reccords loggers is enabled
+    // or bad records redirect is enabled
+    if (badRecordLogger.isBadRecordLoggerEnable() || badRecordLogger.isBadRecordsLogRedirect()) {
+      // rename the bad record in progress to normal
+      CarbonTableIdentifier identifier =
+          configuration.getTableIdentifier().getCarbonTableIdentifier();
+      CarbonDataProcessorUtil.renameBadRecordsFromInProgressToNormal(configuration,
+          identifier.getDatabaseName() + CarbonCommonConstants.FILE_SEPARATOR + identifier
+              .getTableName() + CarbonCommonConstants.FILE_SEPARATOR + configuration.getSegmentId()
+              + CarbonCommonConstants.FILE_SEPARATOR + configuration.getTaskNo());
+    }
   }
 
   @Override protected String getStepName() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a205a55/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/BadRecordsLogger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/BadRecordsLogger.java b/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/BadRecordsLogger.java
index 66b6e71..b93fcb7 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/BadRecordsLogger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/BadRecordsLogger.java
@@ -259,6 +259,14 @@ public class BadRecordsLogger {
     return isDataLoadFail;
   }
 
+  public boolean isBadRecordLoggerEnable() {
+    return badRecordLoggerEnable;
+  }
+
+  public boolean isBadRecordsLogRedirect() {
+    return badRecordsLogRedirect;
+  }
+
   /**
    * closeStreams void
    */