You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2016/08/01 10:05:36 UTC
[38/47] incubator-carbondata git commit: [Bug]Changed locking so that
zookeeper lock or hdfs lock will be available at executer. (#885)
[Bug]Changed locking so that zookeeper lock or hdfs lock will be available at executer. (#885)
[Bug]Changed locking so that zookeeper lock or hdfs lock will be available at executer. (#885)
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/f495b6bd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/f495b6bd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/f495b6bd
Branch: refs/heads/master
Commit: f495b6bd4653b7bb3139377cd93b4d16425553a7
Parents: 066f74b
Author: ashokblend <as...@gmail.com>
Authored: Fri Jul 29 09:19:03 2016 +0530
Committer: david <qi...@qq.com>
Committed: Fri Jul 29 11:49:03 2016 +0800
----------------------------------------------------------------------
.../core/constants/CarbonCommonConstants.java | 10 ++++++
.../org/carbondata/core/locks/HdfsFileLock.java | 9 +++++-
.../carbondata/core/locks/ZooKeeperLocking.java | 33 ++++++++++++++++++--
.../spark/sql/hive/CarbonMetastoreCatalog.scala | 15 ++++++---
.../spark/rdd/CarbonGlobalDictionaryRDD.scala | 18 +++++++++--
.../spark/util/GlobalDictionaryUtil.scala | 11 +++++--
6 files changed, 82 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f495b6bd/core/src/main/java/org/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/carbondata/core/constants/CarbonCommonConstants.java
index 86d87eb..b45d9b4 100644
--- a/core/src/main/java/org/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/carbondata/core/constants/CarbonCommonConstants.java
@@ -854,6 +854,16 @@ public final class CarbonCommonConstants {
*/
public static String COMPACTION_KEY_WORD = "COMPACTION";
+ /**
+ * hdfs temporary directory key
+ */
+ public static final String HDFS_TEMP_LOCATION = "hadoop.tmp.dir";
+
+ /**
+ * zookeeper url key
+ */
+ public static final String ZOOKEEPER_URL = "spark.deploy.zookeeper.url";
+
private CarbonCommonConstants() {
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f495b6bd/core/src/main/java/org/carbondata/core/locks/HdfsFileLock.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/locks/HdfsFileLock.java b/core/src/main/java/org/carbondata/core/locks/HdfsFileLock.java
index e8f44ed..702c8a1 100644
--- a/core/src/main/java/org/carbondata/core/locks/HdfsFileLock.java
+++ b/core/src/main/java/org/carbondata/core/locks/HdfsFileLock.java
@@ -21,9 +21,12 @@ package org.carbondata.core.locks;
import java.io.DataOutputStream;
import java.io.IOException;
+import org.carbondata.common.logging.LogService;
+import org.carbondata.common.logging.LogServiceFactory;
import org.carbondata.core.carbon.CarbonTableIdentifier;
import org.carbondata.core.constants.CarbonCommonConstants;
import org.carbondata.core.datastorage.store.impl.FileFactory;
+import org.carbondata.core.util.CarbonProperties;
/**
* This class is used to handle the HDFS File locking.
@@ -31,6 +34,8 @@ import org.carbondata.core.datastorage.store.impl.FileFactory;
*/
public class HdfsFileLock extends AbstractCarbonLock {
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(HdfsFileLock.class.getName());
/**
* location hdfs file location
*/
@@ -41,7 +46,8 @@ public class HdfsFileLock extends AbstractCarbonLock {
public static String tmpPath;
static {
- tmpPath = System.getProperty("hadoop.tmp.dir");
+ tmpPath = CarbonProperties.getInstance().getProperty(CarbonCommonConstants.HDFS_TEMP_LOCATION,
+ System.getProperty(CarbonCommonConstants.HDFS_TEMP_LOCATION));
}
/**
@@ -51,6 +57,7 @@ public class HdfsFileLock extends AbstractCarbonLock {
public HdfsFileLock(String lockFileLocation, String lockFile) {
this.location = tmpPath + CarbonCommonConstants.FILE_SEPARATOR + lockFileLocation
+ CarbonCommonConstants.FILE_SEPARATOR + lockFile;
+ LOGGER.info("HDFS lock path:"+this.location);
initRetry();
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f495b6bd/core/src/main/java/org/carbondata/core/locks/ZooKeeperLocking.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/locks/ZooKeeperLocking.java b/core/src/main/java/org/carbondata/core/locks/ZooKeeperLocking.java
index e1db3c6..3f309c3 100644
--- a/core/src/main/java/org/carbondata/core/locks/ZooKeeperLocking.java
+++ b/core/src/main/java/org/carbondata/core/locks/ZooKeeperLocking.java
@@ -18,6 +18,7 @@
*/
package org.carbondata.core.locks;
+import java.io.File;
import java.util.Collections;
import java.util.List;
@@ -25,6 +26,7 @@ import org.carbondata.common.logging.LogService;
import org.carbondata.common.logging.LogServiceFactory;
import org.carbondata.core.carbon.CarbonTableIdentifier;
import org.carbondata.core.constants.CarbonCommonConstants;
+import org.carbondata.core.util.CarbonProperties;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
@@ -69,7 +71,8 @@ public class ZooKeeperLocking extends AbstractCarbonLock {
private String lockTypeFolder;
public ZooKeeperLocking(CarbonTableIdentifier tableIdentifier, String lockFile) {
- this(tableIdentifier.getDatabaseName() + '.' + tableIdentifier.getTableName(), lockFile);
+ this(tableIdentifier.getDatabaseName() + File.separator + tableIdentifier.getTableName(),
+ lockFile);
}
/**
@@ -80,7 +83,9 @@ public class ZooKeeperLocking extends AbstractCarbonLock {
this.lockName = lockFile;
this.tableIdFolder = zooKeeperLocation + CarbonCommonConstants.FILE_SEPARATOR + lockLocation;
- zk = ZookeeperInit.getInstance().getZookeeper();
+ String zooKeeperUrl =
+ CarbonProperties.getInstance().getProperty(CarbonCommonConstants.ZOOKEEPER_URL);
+ zk = ZookeeperInit.getInstance(zooKeeperUrl).getZookeeper();
this.lockTypeFolder = zooKeeperLocation + CarbonCommonConstants.FILE_SEPARATOR + lockLocation
+ CarbonCommonConstants.FILE_SEPARATOR + lockFile;
@@ -88,7 +93,7 @@ public class ZooKeeperLocking extends AbstractCarbonLock {
createBaseNode();
// if exists returns null then path doesnt exist. so creating.
if (null == zk.exists(this.tableIdFolder, true)) {
- zk.create(this.tableIdFolder, new byte[1], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ createRecursivly(this.tableIdFolder);
}
// if exists returns null then path doesnt exist. so creating.
if (null == zk.exists(this.lockTypeFolder, true)) {
@@ -111,6 +116,28 @@ public class ZooKeeperLocking extends AbstractCarbonLock {
}
/**
+ * Create zookeepr node if not exist
+ * @param path
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ private void createRecursivly(String path) throws KeeperException, InterruptedException {
+ try {
+ if (zk.exists(path, true) == null && path.length() > 0) {
+ String temp = path.substring(0, path.lastIndexOf(File.separator));
+ createRecursivly(temp);
+ zk.create(path, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ } else {
+ return;
+ }
+ } catch (KeeperException e) {
+ throw e;
+ } catch (InterruptedException e) {
+ throw e;
+ }
+
+ }
+ /**
* Handling of the locking mechanism using zoo keeper.
*/
@Override public boolean lock() {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f495b6bd/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala
index 5966240..7ec1b87 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala
@@ -207,14 +207,19 @@ class CarbonMetastoreCatalog(hive: HiveContext, val storePath: String, client: C
// creating zookeeper instance once.
// if zookeeper is configured as carbon lock type.
- val zookeeperUrl: String = hive.getConf("spark.deploy.zookeeper.url", null)
+ val zookeeperUrl: String = hive.getConf(CarbonCommonConstants.ZOOKEEPER_URL, null)
if (zookeeperUrl != null) {
+ CarbonProperties.getInstance.addProperty(CarbonCommonConstants.ZOOKEEPER_URL, zookeeperUrl)
ZookeeperInit.getInstance(zookeeperUrl)
LOGGER.info("Zookeeper url is configured. Taking the zookeeper as lock type.")
- CarbonProperties.getInstance
- .addProperty(CarbonCommonConstants.LOCK_TYPE,
- CarbonCommonConstants.CARBON_LOCK_TYPE_ZOOKEEPER
- )
+ var configuredLockType = CarbonProperties.getInstance
+ .getProperty(CarbonCommonConstants.LOCK_TYPE)
+ if (null == configuredLockType) {
+ configuredLockType = CarbonCommonConstants.CARBON_LOCK_TYPE_ZOOKEEPER
+ CarbonProperties.getInstance
+ .addProperty(CarbonCommonConstants.LOCK_TYPE,
+ configuredLockType)
+ }
}
if (metadataPath == null) {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f495b6bd/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
index e0e8cbf..a1870e9 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
@@ -38,6 +38,7 @@ import org.carbondata.core.constants.CarbonCommonConstants
import org.carbondata.core.datastorage.store.impl.FileFactory
import org.carbondata.core.locks.CarbonLockFactory
import org.carbondata.core.locks.LockUsage
+import org.carbondata.core.util.CarbonProperties
import org.carbondata.core.util.CarbonTimeStatisticsFactory
import org.carbondata.spark.load.{CarbonLoaderUtil, CarbonLoadModel}
import org.carbondata.spark.partition.reader.{CSVParser, CSVReader}
@@ -154,7 +155,10 @@ case class DictionaryLoadModel(table: CarbonTableIdentifier,
highCardThreshold: Int,
rowCountPercentage: Double,
columnIdentifier: Array[ColumnIdentifier],
- isFirstLoad: Boolean) extends Serializable
+ isFirstLoad: Boolean,
+ hdfstemplocation: String,
+ lockType: String,
+ zooKeeperUrl: String) extends Serializable
case class ColumnDistinctValues(values: Array[String], rowCount: Long) extends Serializable
@@ -296,9 +300,16 @@ class CarbonGlobalDictionaryGenerateRDD(
val pathService = CarbonCommonFactory.getPathService
val carbonTablePath = pathService.getCarbonTablePath(model.columnIdentifier(split.index),
model.hdfsLocation, model.table)
+ CarbonProperties.getInstance.addProperty(CarbonCommonConstants.HDFS_TEMP_LOCATION,
+ model.hdfstemplocation)
+ CarbonProperties.getInstance.addProperty(CarbonCommonConstants.LOCK_TYPE,
+ model.lockType)
+ CarbonProperties.getInstance.addProperty(CarbonCommonConstants.ZOOKEEPER_URL,
+ model.zooKeeperUrl)
val dictLock = CarbonLockFactory
.getCarbonLockObj(carbonTablePath.getRelativeDictionaryDirectory,
model.columnIdentifier(split.index).getColumnId + LockUsage.LOCK)
+ var isDictionaryLocked = false
// generate distinct value list
try {
val t1 = System.currentTimeMillis
@@ -328,7 +339,8 @@ class CarbonGlobalDictionaryGenerateRDD(
LOGGER.info("column " + model.table.getTableUniqueName + "." +
model.primDimensions(split.index).getColName + " is high cardinality column")
} else {
- if (dictLock.lockWithRetries()) {
+ isDictionaryLocked = dictLock.lockWithRetries()
+ if (isDictionaryLocked) {
logInfo(s"Successfully able to get the dictionary lock for ${
model.primDimensions(split.index).getColName
}")
@@ -397,7 +409,7 @@ class CarbonGlobalDictionaryGenerateRDD(
}
org.carbondata.core.util.CarbonUtil.clearDictionaryCache(dictionaryForSortIndexWriting);
if (dictLock != null) {
- if (dictLock.unlock()) {
+ if (isDictionaryLocked && dictLock.unlock()) {
logInfo(s"Dictionary ${
model.primDimensions(split.index).getColName
} Unlocked Successfully.")
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f495b6bd/integration/spark/src/main/scala/org/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark/src/main/scala/org/carbondata/spark/util/GlobalDictionaryUtil.scala
index 18e777d..ff7e360 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/util/GlobalDictionaryUtil.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/util/GlobalDictionaryUtil.scala
@@ -297,7 +297,11 @@ object GlobalDictionaryUtil extends Logging {
val dictFilePaths = dictDetail.dictFilePaths
val dictFileExists = dictDetail.dictFileExists
val columnIdentifier = dictDetail.columnIdentifiers
-
+ val hdfstemplocation = CarbonProperties.getInstance.
+ getProperty(CarbonCommonConstants.HDFS_TEMP_LOCATION)
+ val lockType = CarbonProperties.getInstance
+ .getProperty(CarbonCommonConstants.LOCK_TYPE, CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS)
+ val zookeeperUrl = CarbonProperties.getInstance.getProperty(CarbonCommonConstants.ZOOKEEPER_URL)
// load high cardinality identify configure
val highCardIdentifyEnable = CarbonProperties.getInstance().getProperty(
CarbonCommonConstants.HIGH_CARDINALITY_IDENTIFY_ENABLE,
@@ -326,7 +330,10 @@ object GlobalDictionaryUtil extends Logging {
highCardThreshold,
rowCountPercentage,
columnIdentifier,
- carbonLoadModel.getLoadMetadataDetails.size() == 0)
+ carbonLoadModel.getLoadMetadataDetails.size() == 0,
+ hdfstemplocation,
+ lockType,
+ zookeeperUrl)
}
/**