You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by bi...@apache.org on 2017/02/04 04:44:02 UTC
[33/47] kylin git commit: Add Zookeeper Lock
Add Zookeeper Lock
Signed-off-by: shaofengshi <sh...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/d3276e2e
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/d3276e2e
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/d3276e2e
Branch: refs/heads/KYLIN-2361
Commit: d3276e2e909d3001724ee8fda1304ae8b7f08c63
Parents: d23bf93
Author: xiefan46 <95...@qq.com>
Authored: Fri Jan 20 09:48:17 2017 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Mon Jan 23 16:23:56 2017 +0800
----------------------------------------------------------------------
.../apache/kylin/common/KylinConfigBase.java | 22 +-
.../test_case_data/sandbox/kylin.properties | 4 +
.../storage/hdfs/ITHDFSResourceStoreTest.java | 66 +-----
.../kylin/storage/hdfs/ITLockManagerTest.java | 205 +++++++++++++++++++
.../kylin/storage/hbase/HBaseResourceStore.java | 1 +
.../org/apache/kylin/storage/hdfs/HDFSLock.java | 41 ----
.../kylin/storage/hdfs/HDFSLockManager.java | 45 ----
.../kylin/storage/hdfs/HDFSResourceStore.java | 95 +++++++--
.../apache/kylin/storage/hdfs/LockManager.java | 116 +++++++++++
.../apache/kylin/storage/hdfs/ResourceLock.java | 51 +++++
10 files changed, 471 insertions(+), 175 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/d3276e2e/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 44d636d..75b38ff 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -193,6 +193,14 @@ abstract public class KylinConfigBase implements Serializable {
return new StringBuffer(root).append(StringUtils.replaceChars(getMetadataUrlPrefix(), ':', '-')).append("/").toString();
}
+ public String getRawHdfsWorkingDirectory() {
+ String root = getRequired("kylin.env.hdfs-working-dir");
+ if (!root.endsWith("/")) {
+ root += "/";
+ }
+ return root;
+ }
+
// ============================================================================
// METADATA
// ============================================================================
@@ -201,11 +209,6 @@ abstract public class KylinConfigBase implements Serializable {
return getOptional("kylin.metadata.url");
}
- //for hdfs resource store
- public String getHDFSMetadataUrl() {
- return getOptional("kylin.metadata.hdfs.url", "kylin_default_instance_hdfs@hdfs");
- }
-
// for test only
public void setMetadataUrl(String metadataUrl) {
setProperty("kylin.metadata.url", metadataUrl);
@@ -925,4 +928,13 @@ abstract public class KylinConfigBase implements Serializable {
return Boolean.parseBoolean(getOptional("kylin.web.cross-domain-enabled", "true"));
}
+ //zoo keeper
+ public String getZooKeeperHost() {
+ return getOptional("kylin.storage-zookeeper.host", "localhost");
+ }
+
+ public String getZooKeeperPort() {
+ return getOptional("kylin.storage-zookeeper.port", "2181");
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/d3276e2e/examples/test_case_data/sandbox/kylin.properties
----------------------------------------------------------------------
diff --git a/examples/test_case_data/sandbox/kylin.properties b/examples/test_case_data/sandbox/kylin.properties
index b01c377..2c2da91 100644
--- a/examples/test_case_data/sandbox/kylin.properties
+++ b/examples/test_case_data/sandbox/kylin.properties
@@ -177,3 +177,7 @@ kylin.engine.spark-conf.spark.eventLog.dir=hdfs\:///kylin/spark-history
#kylin.engine.spark-conf.spark.yarn.queue=default
#kylin.engine.spark-conf.spark.yarn.jar=hdfs://sandbox.hortonworks.com:8020/kylin/spark/spark-assembly-1.6.3-hadoop2.6.0.jar
#kylin.engine.spark-conf.spark.io.compression.codec=org.apache.spark.io.SnappyCompressionCodec
+
+
+#zoo keeper
+kylin.storage-zookeeper.host=sandbox
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/d3276e2e/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java b/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java
index ef04957..27d8a3c 100644
--- a/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java
@@ -17,23 +17,15 @@
*/
package org.apache.kylin.storage.hdfs;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.engine.mr.HadoopUtil;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.persistence.ResourceStoreTest;
import org.apache.kylin.common.util.HBaseMetadataTestCase;
import org.junit.After;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import static org.junit.Assert.assertEquals;
/**
* Created by xiefan on 17-1-10.
@@ -53,65 +45,13 @@ public class ITHDFSResourceStoreTest extends HBaseMetadataTestCase {
this.cleanupTestMetadata();
}
- @Ignore
- @Test
- public void testHDFSUrl() throws Exception {
- assertEquals("kylin_default_instance_hdfs@hdfs", kylinConfig.getHDFSMetadataUrl());
- System.out.println("hdfs working dir : " + kylinConfig.getHdfsWorkingDirectory());
- }
-
- @Ignore
@Test
- public void testMultiThreadWriteHDFS() throws Exception{
- //System.out.println(kylinConfig.getHdfsWorkingDirectory());
- final Path testDir = new Path("hdfs:///test123");
- final FileSystem fs = HadoopUtil.getFileSystem(testDir);
- final String fileName = "test.json";
- int threadNum = 3;
- ExecutorService service = Executors.newFixedThreadPool(threadNum);
- final CountDownLatch latch = new CountDownLatch(threadNum);
- Path p = new Path(testDir,fileName);
- fs.deleteOnExit(p);
- fs.createNewFile(p);
- for(int i=0;i<threadNum;i++) {
- service.execute(new Runnable() {
- @Override
- public void run() {
- try {
- long id = Thread.currentThread().getId();
- Path p = new Path(testDir, fileName);
- /*while(fs.exists(p)){
- System.out.println("Thread id : " + id + " can not get lock,sleep a while");
- Thread.currentThread().sleep(1000);
- }*/
- while(!fs.createNewFile(p)){
- System.out.println("Thread id : " + id + " can not get lock,sleep a while");
- Thread.currentThread().sleep(1000);
- }
- System.out.println("Thread id : " + id + " get lock, sleep a while");
- Thread.currentThread().sleep(1000);
- fs.delete(p,true);
- System.out.println("Thread id : " + id + " release lock");
- latch.countDown();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- });
- }
- Thread.currentThread().sleep(1000);
- fs.delete(p,true);
- System.out.println("main thread release lock.Waiting threads down");
- System.out.println("file still exist : " + fs.exists(p));
- latch.await();
- }
-
- @Test
- public void testHDFSStore() throws Exception {
+ public void testResourceStoreBasic() throws Exception {
KylinConfig config = KylinConfig.getInstanceFromEnv();
ResourceStore store = new HDFSResourceStore(config);
ResourceStoreTest.testAStore(store);
}
+
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/d3276e2e/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITLockManagerTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITLockManagerTest.java b/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITLockManagerTest.java
new file mode 100644
index 0000000..2b58d30
--- /dev/null
+++ b/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITLockManagerTest.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.storage.hdfs;
+
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.Assert.assertEquals;
+
+public class ITLockManagerTest extends HBaseMetadataTestCase {
+
+
+ private String zkConnection = "sandbox:2181";
+
+ private KylinConfig kylinConfig;
+
+ private CuratorFramework zkClient;
+
+ private static final String lockRootPath = "/test_lock";
+
+ private LockManager manager;
+
+ private static final int QTY = 5;
+
+ private static final int REPETITIONS = QTY * 10;
+
+ @Before
+ public void setup() throws Exception {
+ this.createTestMetadata();
+ kylinConfig = KylinConfig.getInstanceFromEnv();
+ RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+ zkClient = CuratorFrameworkFactory.newClient(zkConnection, retryPolicy);
+ zkClient.start();
+ manager = new LockManager(kylinConfig, lockRootPath);
+ System.out.println("nodes in lock root : " + zkClient.getChildren().forPath(lockRootPath));
+
+ }
+
+ @After
+ public void after() throws Exception {
+ this.cleanupTestMetadata();
+ zkClient.delete().deletingChildrenIfNeeded().forPath(lockRootPath);
+ List<String> nodes = zkClient.getChildren().forPath("/");
+ System.out.println("nodes in zk after delete : " + nodes);
+ manager.close();
+ }
+
+ @Test
+ public void testCreateLock() throws Exception {
+
+ ResourceLock lock = manager.getLock("/dictionary/numberdict.json");
+ lock.acquire();
+ manager.releaseLock(lock);
+ System.out.println(zkClient.getChildren().forPath(lockRootPath + "/dictionary"));
+ List<String> nodes = zkClient.getChildren().forPath(lockRootPath + "/dictionary");
+ assertEquals(1, nodes.size());
+ assertEquals("numberdict.json", nodes.get(0));
+ }
+
+ @Test
+ public void testLockSafty() throws Exception {
+ // all of the useful sample code is in ExampleClientThatLocks.java
+
+ // FakeLimitedResource simulates some external resource that can only be access by one process at a time
+ final FakeLimitedResource resource = new FakeLimitedResource();
+ ExecutorService service = Executors.newFixedThreadPool(QTY);
+ final TestingServer server = new TestingServer(zkConnection);
+ final List<FutureTask<Void>> tasks = new ArrayList<>();
+ try {
+ for (int i = 0; i < QTY; ++i) {
+ final int index = i;
+ FutureTask<Void> task = new FutureTask<Void>(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ LockManager threadLocalLockManager = new LockManager(kylinConfig, lockRootPath);
+ try {
+ ExampleClientThatLocks example = new ExampleClientThatLocks(threadLocalLockManager, lockRootPath, resource, "Client " + index);
+ for (int j = 0; j < REPETITIONS; ++j) {
+ example.doWork(10, TimeUnit.SECONDS);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } catch (Exception e) {
+ e.printStackTrace();
+ // log or do something
+ } finally {
+ threadLocalLockManager.close();
+ }
+ return null;
+ }
+ });
+ tasks.add(task);
+ service.submit(task);
+ }
+ for (FutureTask<Void> task : tasks) {
+ task.get();
+ }
+ } finally {
+ CloseableUtils.closeQuietly(server);
+ }
+ }
+
+ class FakeLimitedResource {
+ private final AtomicBoolean inUse = new AtomicBoolean(false);
+
+ public void use() throws InterruptedException {
+ // in a real application this would be accessing/manipulating a shared resource
+
+ if (!inUse.compareAndSet(false, true)) {
+ throw new IllegalStateException("Needs to be used by one client at a time");
+ }
+
+ try {
+ Thread.sleep((long) (3 * Math.random()));
+ } finally {
+ inUse.set(false);
+ }
+ }
+ }
+
+ class TestingServer implements Closeable {
+
+ private String connectionString;
+
+ public TestingServer(String connectionStr) {
+ this.connectionString = connectionStr;
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+
+ public String getConnectString() {
+ return connectionString;
+ }
+ }
+
+ class ExampleClientThatLocks {
+
+ private final FakeLimitedResource resource;
+
+ private final String clientName;
+
+ private LockManager lockManager;
+
+ private String lockPath;
+
+ public ExampleClientThatLocks(LockManager lockManager, String lockPath, FakeLimitedResource resource, String clientName) {
+ this.resource = resource;
+ this.clientName = clientName;
+ this.lockManager = lockManager;
+ this.lockPath = lockPath;
+ }
+
+ public void doWork(long time, TimeUnit unit) throws Exception {
+ ResourceLock lock = lockManager.getLock(lockPath);
+ if (!lock.acquire(time, unit)) {
+ throw new IllegalStateException(clientName + " could not acquire the lock");
+ }
+ try {
+ System.out.println(clientName + " has the lock");
+ resource.use();
+ } finally {
+ System.out.println(clientName + " releasing the lock");
+ lock.release(); // always release the lock in a finally block
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/d3276e2e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
index 1c45967..170e351 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
@@ -311,6 +311,7 @@ public class HBaseResourceStore extends ResourceStore {
} finally {
IOUtils.closeQuietly(table);
}
+
}
private Result internalGetFromHTable(Table table, String path, boolean fetchContent, boolean fetchTimestamp) throws IOException {
http://git-wip-us.apache.org/repos/asf/kylin/blob/d3276e2e/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLock.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLock.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLock.java
deleted file mode 100644
index 8710edf..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLock.java
+++ /dev/null
@@ -1,41 +0,0 @@
-package org.apache.kylin.storage.hdfs;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-
-/**
- * Created by xiefan on 17-1-17.
- */
-public class HDFSLock {
-
- private Path rawLock;
-
- private static final Logger logger = LoggerFactory.getLogger(HDFSLock.class);
-
- protected HDFSLock(String resourceFullPath) {
- this.rawLock = new Path(resourceFullPath);
- }
-
- public boolean init(FileSystem fs) throws IOException, InterruptedException {
- if (!fs.isFile(rawLock)) {
- logger.info("Not support directory lock yet");
- return false;
- }
- while (!fs.createNewFile(rawLock)) {
- Thread.currentThread().sleep(1000);
- }
- return true;
- }
-
- public boolean release(FileSystem fs) throws IOException, InterruptedException {
- while (!fs.delete(rawLock, false)) {
- Thread.currentThread().sleep(1000);
- }
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/d3276e2e/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLockManager.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLockManager.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLockManager.java
deleted file mode 100644
index 1cd0800..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLockManager.java
+++ /dev/null
@@ -1,45 +0,0 @@
-package org.apache.kylin.storage.hdfs;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.kylin.engine.mr.HadoopUtil;
-
-import java.io.IOException;
-
-/**
- * Created by xiefan on 17-1-17.
- */
-public class HDFSLockManager {
-
- private static final String LOCK_HOME = "LOCK_HOME";
-
- private Path lockPath;
-
- private FileSystem fs;
-
- public HDFSLockManager(String hdfsWorkingDir) throws IOException{
- this.lockPath = new Path(hdfsWorkingDir,LOCK_HOME);
- this.fs = HadoopUtil.getFileSystem(lockPath);
- if(!fs.exists(lockPath)){
- fs.create(lockPath);
- }
- }
-
- public HDFSLock getLock(String resourceFullPath) throws IOException,InterruptedException,IllegalStateException{
- HDFSLock lock = new HDFSLock(resourceFullPath);
- boolean success = lock.init(fs);
- if(success){
- return lock;
- }else{
- throw new IllegalStateException("Try get lock fail. Resourse path : " + resourceFullPath);
- }
- }
-
- public void releaseLock(HDFSLock lock) throws IOException,InterruptedException,IllegalStateException{
- boolean success = lock.release(fs);
- if(!success)
- throw new IllegalStateException("Release lock fail");
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/d3276e2e/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java
index c7f0f25..a746a97 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.RawResource;
import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.common.util.HadoopUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,44 +39,62 @@ import java.util.Collections;
import java.util.List;
import java.util.NavigableSet;
import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
-/**
- * Created by xiefan on 17-1-10.
- */
public class HDFSResourceStore extends ResourceStore {
- private static final String DEFAULT_TABLE_NAME = "kylin_default_instance_hdfs";
+ private static final Logger logger = LoggerFactory.getLogger(HDFSResourceStore.class);
+
+ private static final long DEFAULT_ACQUIRE_LOCK_TIMEOUT = 10;
+
+ private static final String DEFAULT_FOLDER_NAME = "kylin_default_instance";
+
+ private static final String DEFAULT_METADATA_FOLDER_NAME = "hdfs_metadata";
private Path hdfsMetaPath;
private FileSystem fs;
- private HDFSLockManager lockManager;
-
- private static final Logger logger = LoggerFactory.getLogger(HDFSResourceStore.class);
+ private LockManager lockManager;
//public for test. Normal should be protected
- public HDFSResourceStore(KylinConfig kylinConfig) throws IOException {
+ public HDFSResourceStore(KylinConfig kylinConfig) throws Exception {
super(kylinConfig);
- String metadataUrl = kylinConfig.getHDFSMetadataUrl();
- // split TABLE@HBASE_URL
+ String metadataUrl = kylinConfig.getMetadataUrl();
int cut = metadataUrl.indexOf('@');
- String metaDirName = cut < 0 ? DEFAULT_TABLE_NAME : metadataUrl.substring(0, cut);
+ String metaDirName = cut < 0 ? DEFAULT_FOLDER_NAME : metadataUrl.substring(0, cut);
+ metaDirName += "/" + DEFAULT_METADATA_FOLDER_NAME;
+ logger.info("meta dir name :" + metaDirName);
createMetaFolder(metaDirName, kylinConfig);
}
- private void createMetaFolder(String metaDirName, KylinConfig kylinConfig) throws IOException {
+ private void createMetaFolder(String metaDirName, KylinConfig kylinConfig) throws Exception {
String hdfsWorkingDir = kylinConfig.getHdfsWorkingDirectory();
fs = HadoopUtil.getFileSystem(hdfsWorkingDir);
+ logger.info("hdfs working dir : " + hdfsWorkingDir);
Path hdfsWorkingPath = new Path(hdfsWorkingDir);
if (!fs.exists(hdfsWorkingPath)) {
throw new IOException("HDFS working dir not exist");
}
+ //creat lock manager
+ this.lockManager = new LockManager(kylinConfig, kylinConfig.getRawHdfsWorkingDirectory() + metaDirName);
+ //create hdfs meta path
hdfsMetaPath = new Path(hdfsWorkingPath, metaDirName);
if (!fs.exists(hdfsMetaPath)) {
- fs.create(hdfsMetaPath, true);
+ ResourceLock lock = lockManager.getLock(lockManager.getLockPath("/"));
+ try {
+ if (lock.acquire(DEFAULT_ACQUIRE_LOCK_TIMEOUT, TimeUnit.MINUTES)) {
+ logger.info("get root lock successfully");
+ if (!fs.exists(hdfsMetaPath)) {
+ fs.mkdirs(hdfsMetaPath);
+ logger.info("create hdfs meta path");
+ }
+ }
+ } finally {
+ lockManager.releaseLock(lock);
+ }
}
- lockManager = new HDFSLockManager(hdfsWorkingDir);
+ logger.info("hdfs meta path : " + hdfsMetaPath.toString());
}
@Override
@@ -132,7 +150,8 @@ public class HDFSResourceStore extends ResourceStore {
logger.warn("Zero length file: " + p.toString());
}
FSDataInputStream in = fs.open(p);
- return new RawResource(fs.open(p), getResourceTimestamp(resPath));
+ long t = in.readLong();
+ return new RawResource(in, t);
} else {
return null;
}
@@ -144,19 +163,42 @@ public class HDFSResourceStore extends ResourceStore {
if (!fs.exists(p) || !fs.isFile(p)) {
return 0;
}
- FileStatus status = fs.getFileStatus(p);
- return status.getModificationTime();
+ FSDataInputStream in = null;
+ ResourceLock lock = null;
+ try {
+ lock = lockManager.getLock(resPath);
+ lock.acquire(DEFAULT_ACQUIRE_LOCK_TIMEOUT, TimeUnit.MINUTES);
+ in = fs.open(p);
+ long t = in.readLong();
+ return t;
+ } catch (Exception e) {
+ throw new IOException("Put resource fail", e);
+ } finally {
+ IOUtils.closeQuietly(in);
+ lockManager.releaseLock(lock);
+ }
+
}
@Override
protected void putResourceImpl(String resPath, InputStream content, long ts) throws IOException {
+ logger.info("res path : " + resPath);
Path p = getRealHDFSPath(resPath);
+ logger.info("put resource : " + p.toUri());
FSDataOutputStream out = null;
+ ResourceLock lock = null;
try {
+ lock = lockManager.getLock(resPath);
+ lock.acquire(DEFAULT_ACQUIRE_LOCK_TIMEOUT, TimeUnit.MINUTES);
out = fs.create(p, true);
+ out.writeLong(ts);
IOUtils.copy(content, out);
+
+ } catch (Exception e) {
+ throw new IOException("Put resource fail", e);
} finally {
IOUtils.closeQuietly(out);
+ lockManager.releaseLock(lock);
}
}
@@ -180,9 +222,18 @@ public class HDFSResourceStore extends ResourceStore {
@Override
protected void deleteResourceImpl(String resPath) throws IOException {
- Path p = getRealHDFSPath(resPath);
- if (fs.exists(p)) {
- fs.delete(p, true);
+ ResourceLock lock = null;
+ try {
+ lock = lockManager.getLock(resPath);
+ lock.acquire(DEFAULT_ACQUIRE_LOCK_TIMEOUT, TimeUnit.MINUTES);
+ Path p = getRealHDFSPath(resPath);
+ if (fs.exists(p)) {
+ fs.delete(p, true);
+ }
+ } catch (Exception e) {
+ throw new IOException("Delete resource fail", e);
+ } finally {
+ lockManager.releaseLock(lock);
}
}
@@ -192,6 +243,8 @@ public class HDFSResourceStore extends ResourceStore {
}
private Path getRealHDFSPath(String resourcePath) {
+ if (resourcePath.startsWith("/") && resourcePath.length() > 1)
+ resourcePath = resourcePath.substring(1, resourcePath.length());
return new Path(this.hdfsMetaPath, resourcePath);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/d3276e2e/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/LockManager.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/LockManager.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/LockManager.java
new file mode 100644
index 0000000..4959718
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/LockManager.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.storage.hdfs;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.storage.hbase.util.ZookeeperDistributedJobLock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.util.Arrays;
+
+public class LockManager {
+
+ private static Logger logger = LoggerFactory.getLogger(ZookeeperDistributedJobLock.class);
+
+ final private KylinConfig config;
+
+ final CuratorFramework zkClient;
+
+ private String lockRootPath;
+
+ public LockManager(String lockRootPath) throws Exception {
+
+ this(KylinConfig.getInstanceFromEnv(), lockRootPath);
+ }
+
+ public LockManager(KylinConfig config, String lockRootPath) throws Exception {
+ this.config = config;
+ this.lockRootPath = lockRootPath;
+ String zkConnectString = getZKConnectString(config);
+ logger.info("zk connection string:" + zkConnectString);
+ if (StringUtils.isEmpty(zkConnectString)) {
+ throw new IllegalArgumentException("ZOOKEEPER_QUORUM is empty!");
+ }
+ RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+ zkClient = CuratorFrameworkFactory.newClient(zkConnectString, retryPolicy);
+ zkClient.start();
+ if (zkClient.checkExists().forPath(lockRootPath) == null)
+ zkClient.create().creatingParentsIfNeeded().forPath(lockRootPath);
+ Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+ @Override
+ public void run() {
+ close();
+ }
+ }));
+ }
+
+ public ResourceLock getLock(String name) throws Exception {
+ String lockPath = getLockPath(name);
+ InterProcessMutex lock = new InterProcessMutex(zkClient, lockPath);
+ return new ResourceLock(lockPath, lock);
+ }
+
+ public void releaseLock(ResourceLock lock) {
+ try {
+ if (lock != null)
+ lock.release();
+ } catch (Exception e) {
+ logger.error("Fail to release lock");
+ e.printStackTrace();
+ }
+ }
+
+ private static String getZKConnectString(KylinConfig kylinConfig) {
+ final String host = kylinConfig.getZooKeeperHost();
+ final String port = kylinConfig.getZooKeeperPort();
+ return StringUtils.join(Iterables.transform(Arrays.asList(host.split(",")), new Function<String, String>() {
+ @Nullable
+ @Override
+ public String apply(String input) {
+ return input + ":" + port;
+ }
+ }), ",");
+ }
+
+ public String getLockPath(String resourceName) {
+ if (!resourceName.startsWith("/"))
+ resourceName = "/" + resourceName;
+ if (resourceName.endsWith("/"))
+ resourceName = resourceName.substring(0, resourceName.length() - 1);
+ return lockRootPath + resourceName;
+ }
+
+ public void close() {
+ try {
+ zkClient.close();
+ } catch (Exception e) {
+ logger.error("error occurred to close PathChildrenCache", e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/d3276e2e/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/ResourceLock.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/ResourceLock.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/ResourceLock.java
new file mode 100644
index 0000000..9d51871
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/ResourceLock.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.storage.hdfs;
+
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+
+import java.util.concurrent.TimeUnit;
+
+
+public class ResourceLock {
+
+ private String resourcePath;
+
+ private InterProcessMutex lock;
+
+ protected ResourceLock(String resourcePath, InterProcessMutex lock) {
+ this.resourcePath = resourcePath;
+ this.lock = lock;
+ }
+
+ public boolean acquire(long time, TimeUnit unit) throws Exception {
+ return lock.acquire(time, unit);
+ }
+
+ public void acquire() throws Exception{
+ lock.acquire();
+ }
+
+ protected void release() throws Exception {
+ lock.release();
+ }
+
+ public String getResourcePath() {
+ return resourcePath;
+ }
+}