You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2016/08/01 10:05:36 UTC

[38/47] incubator-carbondata git commit: [Bug]Changed locking so that zookeeper lock or hdfs lock will be available at executer. (#885)

[Bug]Changed locking so that zookeeper lock or hdfs lock will be available at executer. (#885)

[Bug]Changed locking so that zookeeper lock or hdfs lock will be available at executer. (#885)

Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/f495b6bd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/f495b6bd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/f495b6bd

Branch: refs/heads/master
Commit: f495b6bd4653b7bb3139377cd93b4d16425553a7
Parents: 066f74b
Author: ashokblend <as...@gmail.com>
Authored: Fri Jul 29 09:19:03 2016 +0530
Committer: david <qi...@qq.com>
Committed: Fri Jul 29 11:49:03 2016 +0800

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   | 10 ++++++
 .../org/carbondata/core/locks/HdfsFileLock.java |  9 +++++-
 .../carbondata/core/locks/ZooKeeperLocking.java | 33 ++++++++++++++++++--
 .../spark/sql/hive/CarbonMetastoreCatalog.scala | 15 ++++++---
 .../spark/rdd/CarbonGlobalDictionaryRDD.scala   | 18 +++++++++--
 .../spark/util/GlobalDictionaryUtil.scala       | 11 +++++--
 6 files changed, 82 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f495b6bd/core/src/main/java/org/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/carbondata/core/constants/CarbonCommonConstants.java
index 86d87eb..b45d9b4 100644
--- a/core/src/main/java/org/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/carbondata/core/constants/CarbonCommonConstants.java
@@ -854,6 +854,16 @@ public final class CarbonCommonConstants {
    */
   public static String COMPACTION_KEY_WORD = "COMPACTION";
 
+  /**
+   * hdfs temporary directory key
+   */
+  public static final String HDFS_TEMP_LOCATION = "hadoop.tmp.dir";
+
+  /**
+   * zookeeper url key
+   */
+  public static final String ZOOKEEPER_URL = "spark.deploy.zookeeper.url";
+
   private CarbonCommonConstants() {
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f495b6bd/core/src/main/java/org/carbondata/core/locks/HdfsFileLock.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/locks/HdfsFileLock.java b/core/src/main/java/org/carbondata/core/locks/HdfsFileLock.java
index e8f44ed..702c8a1 100644
--- a/core/src/main/java/org/carbondata/core/locks/HdfsFileLock.java
+++ b/core/src/main/java/org/carbondata/core/locks/HdfsFileLock.java
@@ -21,9 +21,12 @@ package org.carbondata.core.locks;
 import java.io.DataOutputStream;
 import java.io.IOException;
 
+import org.carbondata.common.logging.LogService;
+import org.carbondata.common.logging.LogServiceFactory;
 import org.carbondata.core.carbon.CarbonTableIdentifier;
 import org.carbondata.core.constants.CarbonCommonConstants;
 import org.carbondata.core.datastorage.store.impl.FileFactory;
+import org.carbondata.core.util.CarbonProperties;
 
 /**
  * This class is used to handle the HDFS File locking.
@@ -31,6 +34,8 @@ import org.carbondata.core.datastorage.store.impl.FileFactory;
  */
 public class HdfsFileLock extends AbstractCarbonLock {
 
+  private static final LogService LOGGER =
+             LogServiceFactory.getLogService(HdfsFileLock.class.getName());
   /**
    * location hdfs file location
    */
@@ -41,7 +46,8 @@ public class HdfsFileLock extends AbstractCarbonLock {
   public static String tmpPath;
 
   static {
-    tmpPath = System.getProperty("hadoop.tmp.dir");
+    tmpPath = CarbonProperties.getInstance().getProperty(CarbonCommonConstants.HDFS_TEMP_LOCATION,
+               System.getProperty(CarbonCommonConstants.HDFS_TEMP_LOCATION));
   }
 
   /**
@@ -51,6 +57,7 @@ public class HdfsFileLock extends AbstractCarbonLock {
   public HdfsFileLock(String lockFileLocation, String lockFile) {
     this.location = tmpPath + CarbonCommonConstants.FILE_SEPARATOR + lockFileLocation
         + CarbonCommonConstants.FILE_SEPARATOR + lockFile;
+    LOGGER.info("HDFS lock path:"+this.location);
     initRetry();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f495b6bd/core/src/main/java/org/carbondata/core/locks/ZooKeeperLocking.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/locks/ZooKeeperLocking.java b/core/src/main/java/org/carbondata/core/locks/ZooKeeperLocking.java
index e1db3c6..3f309c3 100644
--- a/core/src/main/java/org/carbondata/core/locks/ZooKeeperLocking.java
+++ b/core/src/main/java/org/carbondata/core/locks/ZooKeeperLocking.java
@@ -18,6 +18,7 @@
  */
 package org.carbondata.core.locks;
 
+import java.io.File;
 import java.util.Collections;
 import java.util.List;
 
@@ -25,6 +26,7 @@ import org.carbondata.common.logging.LogService;
 import org.carbondata.common.logging.LogServiceFactory;
 import org.carbondata.core.carbon.CarbonTableIdentifier;
 import org.carbondata.core.constants.CarbonCommonConstants;
+import org.carbondata.core.util.CarbonProperties;
 
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -69,7 +71,8 @@ public class ZooKeeperLocking extends AbstractCarbonLock {
   private String lockTypeFolder;
 
   public ZooKeeperLocking(CarbonTableIdentifier tableIdentifier, String lockFile) {
-    this(tableIdentifier.getDatabaseName() + '.' + tableIdentifier.getTableName(), lockFile);
+    this(tableIdentifier.getDatabaseName() + File.separator + tableIdentifier.getTableName(),
+        lockFile);
   }
 
   /**
@@ -80,7 +83,9 @@ public class ZooKeeperLocking extends AbstractCarbonLock {
     this.lockName = lockFile;
     this.tableIdFolder = zooKeeperLocation + CarbonCommonConstants.FILE_SEPARATOR + lockLocation;
 
-    zk = ZookeeperInit.getInstance().getZookeeper();
+    String zooKeeperUrl =
+        CarbonProperties.getInstance().getProperty(CarbonCommonConstants.ZOOKEEPER_URL);
+    zk = ZookeeperInit.getInstance(zooKeeperUrl).getZookeeper();
 
     this.lockTypeFolder = zooKeeperLocation + CarbonCommonConstants.FILE_SEPARATOR + lockLocation
         + CarbonCommonConstants.FILE_SEPARATOR + lockFile;
@@ -88,7 +93,7 @@ public class ZooKeeperLocking extends AbstractCarbonLock {
       createBaseNode();
       // if exists returns null then path doesnt exist. so creating.
       if (null == zk.exists(this.tableIdFolder, true)) {
-        zk.create(this.tableIdFolder, new byte[1], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        createRecursivly(this.tableIdFolder);
       }
       // if exists returns null then path doesnt exist. so creating.
       if (null == zk.exists(this.lockTypeFolder, true)) {
@@ -111,6 +116,28 @@ public class ZooKeeperLocking extends AbstractCarbonLock {
   }
 
   /**
+   * Create zookeepr node if not exist
+   * @param path
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  private void createRecursivly(String path) throws KeeperException, InterruptedException {
+    try {
+      if (zk.exists(path, true) == null && path.length() > 0) {
+        String temp = path.substring(0, path.lastIndexOf(File.separator));
+        createRecursivly(temp);
+        zk.create(path, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+      } else {
+        return;
+      }
+    } catch (KeeperException e) {
+      throw e;
+    } catch (InterruptedException e) {
+      throw e;
+    }
+
+  }
+  /**
    * Handling of the locking mechanism using zoo keeper.
    */
   @Override public boolean lock() {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f495b6bd/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala
index 5966240..7ec1b87 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala
@@ -207,14 +207,19 @@ class CarbonMetastoreCatalog(hive: HiveContext, val storePath: String, client: C
 
     // creating zookeeper instance once.
     // if zookeeper is configured as carbon lock type.
-    val zookeeperUrl: String = hive.getConf("spark.deploy.zookeeper.url", null)
+    val zookeeperUrl: String = hive.getConf(CarbonCommonConstants.ZOOKEEPER_URL, null)
     if (zookeeperUrl != null) {
+      CarbonProperties.getInstance.addProperty(CarbonCommonConstants.ZOOKEEPER_URL, zookeeperUrl)
       ZookeeperInit.getInstance(zookeeperUrl)
       LOGGER.info("Zookeeper url is configured. Taking the zookeeper as lock type.")
-      CarbonProperties.getInstance
-        .addProperty(CarbonCommonConstants.LOCK_TYPE,
-          CarbonCommonConstants.CARBON_LOCK_TYPE_ZOOKEEPER
-        )
+      var configuredLockType = CarbonProperties.getInstance
+      .getProperty(CarbonCommonConstants.LOCK_TYPE)
+      if (null == configuredLockType) {
+        configuredLockType = CarbonCommonConstants.CARBON_LOCK_TYPE_ZOOKEEPER
+        CarbonProperties.getInstance
+            .addProperty(CarbonCommonConstants.LOCK_TYPE,
+                configuredLockType)
+      }
     }
 
     if (metadataPath == null) {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f495b6bd/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
index e0e8cbf..a1870e9 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
@@ -38,6 +38,7 @@ import org.carbondata.core.constants.CarbonCommonConstants
 import org.carbondata.core.datastorage.store.impl.FileFactory
 import org.carbondata.core.locks.CarbonLockFactory
 import org.carbondata.core.locks.LockUsage
+import org.carbondata.core.util.CarbonProperties
 import org.carbondata.core.util.CarbonTimeStatisticsFactory
 import org.carbondata.spark.load.{CarbonLoaderUtil, CarbonLoadModel}
 import org.carbondata.spark.partition.reader.{CSVParser, CSVReader}
@@ -154,7 +155,10 @@ case class DictionaryLoadModel(table: CarbonTableIdentifier,
     highCardThreshold: Int,
     rowCountPercentage: Double,
     columnIdentifier: Array[ColumnIdentifier],
-    isFirstLoad: Boolean) extends Serializable
+    isFirstLoad: Boolean,
+    hdfstemplocation: String,
+    lockType: String,
+    zooKeeperUrl: String) extends Serializable
 
 case class ColumnDistinctValues(values: Array[String], rowCount: Long) extends Serializable
 
@@ -296,9 +300,16 @@ class CarbonGlobalDictionaryGenerateRDD(
       val pathService = CarbonCommonFactory.getPathService
       val carbonTablePath = pathService.getCarbonTablePath(model.columnIdentifier(split.index),
           model.hdfsLocation, model.table)
+      CarbonProperties.getInstance.addProperty(CarbonCommonConstants.HDFS_TEMP_LOCATION,
+              model.hdfstemplocation)
+      CarbonProperties.getInstance.addProperty(CarbonCommonConstants.LOCK_TYPE,
+              model.lockType)
+       CarbonProperties.getInstance.addProperty(CarbonCommonConstants.ZOOKEEPER_URL,
+              model.zooKeeperUrl)
       val dictLock = CarbonLockFactory
         .getCarbonLockObj(carbonTablePath.getRelativeDictionaryDirectory,
           model.columnIdentifier(split.index).getColumnId + LockUsage.LOCK)
+      var isDictionaryLocked = false
       // generate distinct value list
       try {
         val t1 = System.currentTimeMillis
@@ -328,7 +339,8 @@ class CarbonGlobalDictionaryGenerateRDD(
           LOGGER.info("column " + model.table.getTableUniqueName + "." +
                       model.primDimensions(split.index).getColName + " is high cardinality column")
         } else {
-          if (dictLock.lockWithRetries()) {
+          isDictionaryLocked = dictLock.lockWithRetries()
+          if (isDictionaryLocked) {
             logInfo(s"Successfully able to get the dictionary lock for ${
               model.primDimensions(split.index).getColName
             }")
@@ -397,7 +409,7 @@ class CarbonGlobalDictionaryGenerateRDD(
         }
         org.carbondata.core.util.CarbonUtil.clearDictionaryCache(dictionaryForSortIndexWriting);
         if (dictLock != null) {
-          if (dictLock.unlock()) {
+          if (isDictionaryLocked && dictLock.unlock()) {
             logInfo(s"Dictionary ${
               model.primDimensions(split.index).getColName
             } Unlocked Successfully.")

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f495b6bd/integration/spark/src/main/scala/org/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark/src/main/scala/org/carbondata/spark/util/GlobalDictionaryUtil.scala
index 18e777d..ff7e360 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/util/GlobalDictionaryUtil.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/util/GlobalDictionaryUtil.scala
@@ -297,7 +297,11 @@ object GlobalDictionaryUtil extends Logging {
     val dictFilePaths = dictDetail.dictFilePaths
     val dictFileExists = dictDetail.dictFileExists
     val columnIdentifier = dictDetail.columnIdentifiers
-
+    val hdfstemplocation = CarbonProperties.getInstance.
+                      getProperty(CarbonCommonConstants.HDFS_TEMP_LOCATION)
+    val lockType = CarbonProperties.getInstance
+      .getProperty(CarbonCommonConstants.LOCK_TYPE, CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS)
+    val zookeeperUrl = CarbonProperties.getInstance.getProperty(CarbonCommonConstants.ZOOKEEPER_URL)
     // load high cardinality identify configure
     val highCardIdentifyEnable = CarbonProperties.getInstance().getProperty(
         CarbonCommonConstants.HIGH_CARDINALITY_IDENTIFY_ENABLE,
@@ -326,7 +330,10 @@ object GlobalDictionaryUtil extends Logging {
       highCardThreshold,
       rowCountPercentage,
       columnIdentifier,
-      carbonLoadModel.getLoadMetadataDetails.size() == 0)
+      carbonLoadModel.getLoadMetadataDetails.size() == 0,
+      hdfstemplocation,
+      lockType,
+      zookeeperUrl)
   }
 
   /**