You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2017/01/20 09:44:05 UTC
[1/2] kylin git commit: KYLIN-2374 Allow kylin to store metadata in
HDFS instead of HBase [Forced Update!]
Repository: kylin
Updated Branches:
refs/heads/KYLIN-2374 870cbf269 -> 24cae5c73 (forced update)
KYLIN-2374 Allow kylin to store metadata in HDFS instead of HBase
Signed-off-by: shaofengshi <sh...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/0f88801a
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/0f88801a
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/0f88801a
Branch: refs/heads/KYLIN-2374
Commit: 0f88801a1e696eb58c58ec05e57f296a2ca6542e
Parents: 1e4ae54
Author: xiefan46 <95...@qq.com>
Authored: Wed Jan 11 10:00:19 2017 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Fri Jan 20 17:43:38 2017 +0800
----------------------------------------------------------------------
.../apache/kylin/common/KylinConfigBase.java | 9 +-
.../common/persistence/ResourceStoreTest.java | 3 +-
.../test_case_data/sandbox/kylin.properties | 1 +
.../storage/hdfs/ITHDFSResourceStoreTest.java | 117 +++++++++++
.../org/apache/kylin/storage/hdfs/HDFSLock.java | 41 ++++
.../kylin/storage/hdfs/HDFSLockManager.java | 45 +++++
.../kylin/storage/hdfs/HDFSResourceStore.java | 198 +++++++++++++++++++
7 files changed, 411 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/0f88801a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 74903d5..29ad5eb 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -181,7 +181,7 @@ abstract public class KylinConfigBase implements Serializable {
if (!root.endsWith("/")) {
root += "/";
}
-
+
// make sure path qualified
if (!root.contains("://")) {
if (!root.startsWith("/"))
@@ -189,7 +189,7 @@ abstract public class KylinConfigBase implements Serializable {
else
root = "hdfs://" + root;
}
-
+
return new StringBuffer(root).append(StringUtils.replaceChars(getMetadataUrlPrefix(), ':', '-')).append("/").toString();
}
@@ -201,6 +201,11 @@ abstract public class KylinConfigBase implements Serializable {
return getOptional("kylin.metadata.url");
}
+ //for hdfs resource store
+ public String getHDFSMetadataUrl() {
+ return getOptional("kylin.metadata.hdfs.url", "kylin_default_instance_hdfs@hdfs");
+ }
+
// for test only
public void setMetadataUrl(String metadataUrl) {
setProperty("kylin.metadata.url", metadataUrl);
http://git-wip-us.apache.org/repos/asf/kylin/blob/0f88801a/core-common/src/test/java/org/apache/kylin/common/persistence/ResourceStoreTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/persistence/ResourceStoreTest.java b/core-common/src/test/java/org/apache/kylin/common/persistence/ResourceStoreTest.java
index 4c31a15..ddaf481 100644
--- a/core-common/src/test/java/org/apache/kylin/common/persistence/ResourceStoreTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/persistence/ResourceStoreTest.java
@@ -110,9 +110,10 @@ public class ResourceStoreTest {
}
// list
- NavigableSet<String> list;
+ NavigableSet<String> list = null;
list = store.listResources(dir1);
+ System.out.println(list);
assertTrue(list.contains(path1));
assertTrue(list.contains(path2) == false);
http://git-wip-us.apache.org/repos/asf/kylin/blob/0f88801a/examples/test_case_data/sandbox/kylin.properties
----------------------------------------------------------------------
diff --git a/examples/test_case_data/sandbox/kylin.properties b/examples/test_case_data/sandbox/kylin.properties
index 06f8e4b..2d3cd79 100644
--- a/examples/test_case_data/sandbox/kylin.properties
+++ b/examples/test_case_data/sandbox/kylin.properties
@@ -41,6 +41,7 @@ kylin.source.hive.client=cli
# The metadata store in hbase
kylin.metadata.url=kylin_default_instance@hbase
+
# The storage for final cube file in hbase
kylin.storage.url=hbase
http://git-wip-us.apache.org/repos/asf/kylin/blob/0f88801a/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java b/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java
new file mode 100644
index 0000000..ef04957
--- /dev/null
+++ b/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hdfs;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.ResourceStoreTest;
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Created by xiefan on 17-1-10.
+ */
+public class ITHDFSResourceStoreTest extends HBaseMetadataTestCase {
+
+ KylinConfig kylinConfig;
+
+ @Before
+ public void setup() throws Exception {
+ this.createTestMetadata();
+ kylinConfig = KylinConfig.getInstanceFromEnv();
+ }
+
+ @After
+ public void after() throws Exception {
+ this.cleanupTestMetadata();
+ }
+
+ @Ignore
+ @Test
+ public void testHDFSUrl() throws Exception {
+ assertEquals("kylin_default_instance_hdfs@hdfs", kylinConfig.getHDFSMetadataUrl());
+ System.out.println("hdfs working dir : " + kylinConfig.getHdfsWorkingDirectory());
+ }
+
+
+ @Ignore
+ @Test
+ public void testMultiThreadWriteHDFS() throws Exception{
+ //System.out.println(kylinConfig.getHdfsWorkingDirectory());
+ final Path testDir = new Path("hdfs:///test123");
+ final FileSystem fs = HadoopUtil.getFileSystem(testDir);
+ final String fileName = "test.json";
+ int threadNum = 3;
+ ExecutorService service = Executors.newFixedThreadPool(threadNum);
+ final CountDownLatch latch = new CountDownLatch(threadNum);
+ Path p = new Path(testDir,fileName);
+ fs.deleteOnExit(p);
+ fs.createNewFile(p);
+ for(int i=0;i<threadNum;i++) {
+ service.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ long id = Thread.currentThread().getId();
+ Path p = new Path(testDir, fileName);
+ /*while(fs.exists(p)){
+ System.out.println("Thread id : " + id + " can not get lock,sleep a while");
+ Thread.currentThread().sleep(1000);
+ }*/
+ while(!fs.createNewFile(p)){
+ System.out.println("Thread id : " + id + " can not get lock,sleep a while");
+ Thread.currentThread().sleep(1000);
+ }
+ System.out.println("Thread id : " + id + " get lock, sleep a while");
+ Thread.currentThread().sleep(1000);
+ fs.delete(p,true);
+ System.out.println("Thread id : " + id + " release lock");
+ latch.countDown();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ }
+ Thread.currentThread().sleep(1000);
+ fs.delete(p,true);
+ System.out.println("main thread release lock.Waiting threads down");
+ System.out.println("file still exist : " + fs.exists(p));
+ latch.await();
+ }
+
+ @Test
+ public void testHDFSStore() throws Exception {
+ KylinConfig config = KylinConfig.getInstanceFromEnv();
+ ResourceStore store = new HDFSResourceStore(config);
+ ResourceStoreTest.testAStore(store);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/0f88801a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLock.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLock.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLock.java
new file mode 100644
index 0000000..8710edf
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLock.java
@@ -0,0 +1,41 @@
+package org.apache.kylin.storage.hdfs;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * Created by xiefan on 17-1-17.
+ */
+public class HDFSLock {
+
+ private Path rawLock;
+
+ private static final Logger logger = LoggerFactory.getLogger(HDFSLock.class);
+
+ protected HDFSLock(String resourceFullPath) {
+ this.rawLock = new Path(resourceFullPath);
+ }
+
+ public boolean init(FileSystem fs) throws IOException, InterruptedException {
+ if (!fs.isFile(rawLock)) {
+ logger.info("Not support directory lock yet");
+ return false;
+ }
+ while (!fs.createNewFile(rawLock)) {
+ Thread.currentThread().sleep(1000);
+ }
+ return true;
+ }
+
+ public boolean release(FileSystem fs) throws IOException, InterruptedException {
+ while (!fs.delete(rawLock, false)) {
+ Thread.currentThread().sleep(1000);
+ }
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/0f88801a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLockManager.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLockManager.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLockManager.java
new file mode 100644
index 0000000..1cd0800
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLockManager.java
@@ -0,0 +1,45 @@
+package org.apache.kylin.storage.hdfs;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.engine.mr.HadoopUtil;
+
+import java.io.IOException;
+
+/**
+ * Created by xiefan on 17-1-17.
+ */
+public class HDFSLockManager {
+
+ private static final String LOCK_HOME = "LOCK_HOME";
+
+ private Path lockPath;
+
+ private FileSystem fs;
+
+ public HDFSLockManager(String hdfsWorkingDir) throws IOException{
+ this.lockPath = new Path(hdfsWorkingDir,LOCK_HOME);
+ this.fs = HadoopUtil.getFileSystem(lockPath);
+ if(!fs.exists(lockPath)){
+ fs.create(lockPath);
+ }
+ }
+
+ public HDFSLock getLock(String resourceFullPath) throws IOException,InterruptedException,IllegalStateException{
+ HDFSLock lock = new HDFSLock(resourceFullPath);
+ boolean success = lock.init(fs);
+ if(success){
+ return lock;
+ }else{
+ throw new IllegalStateException("Try get lock fail. Resourse path : " + resourceFullPath);
+ }
+ }
+
+ public void releaseLock(HDFSLock lock) throws IOException,InterruptedException,IllegalStateException{
+ boolean success = lock.release(fs);
+ if(!success)
+ throw new IllegalStateException("Release lock fail");
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/0f88801a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java
new file mode 100644
index 0000000..c7f0f25
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hdfs;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.RawResource;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.engine.mr.HadoopUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.TreeSet;
+
+/**
+ * Created by xiefan on 17-1-10.
+ */
+public class HDFSResourceStore extends ResourceStore {
+
+ private static final String DEFAULT_TABLE_NAME = "kylin_default_instance_hdfs";
+
+ private Path hdfsMetaPath;
+
+ private FileSystem fs;
+
+ private HDFSLockManager lockManager;
+
+ private static final Logger logger = LoggerFactory.getLogger(HDFSResourceStore.class);
+
+ //public for test. Normal should be protected
+ public HDFSResourceStore(KylinConfig kylinConfig) throws IOException {
+ super(kylinConfig);
+ String metadataUrl = kylinConfig.getHDFSMetadataUrl();
+ // split TABLE@HBASE_URL
+ int cut = metadataUrl.indexOf('@');
+ String metaDirName = cut < 0 ? DEFAULT_TABLE_NAME : metadataUrl.substring(0, cut);
+ createMetaFolder(metaDirName, kylinConfig);
+ }
+
+ private void createMetaFolder(String metaDirName, KylinConfig kylinConfig) throws IOException {
+ String hdfsWorkingDir = kylinConfig.getHdfsWorkingDirectory();
+ fs = HadoopUtil.getFileSystem(hdfsWorkingDir);
+ Path hdfsWorkingPath = new Path(hdfsWorkingDir);
+ if (!fs.exists(hdfsWorkingPath)) {
+ throw new IOException("HDFS working dir not exist");
+ }
+ hdfsMetaPath = new Path(hdfsWorkingPath, metaDirName);
+ if (!fs.exists(hdfsMetaPath)) {
+ fs.create(hdfsMetaPath, true);
+ }
+ lockManager = new HDFSLockManager(hdfsWorkingDir);
+ }
+
+ @Override
+ protected NavigableSet<String> listResourcesImpl(String folderPath) throws IOException {
+ Path p = getRealHDFSPath(folderPath);
+ if (!fs.exists(p) || !fs.isDirectory(p)) {
+ return null;
+ }
+ TreeSet<String> r = new TreeSet<>();
+ FileStatus[] statuses = fs.listStatus(p);
+ String prefix = folderPath.endsWith("/") ? folderPath : folderPath + "/";
+ for (FileStatus status : statuses) {
+ r.add(prefix + status.getPath().getName());
+ }
+ return r;
+ }
+
+ @Override
+ protected boolean existsImpl(String resPath) throws IOException {
+ Path p = getRealHDFSPath(resPath);
+ return fs.exists(p) && fs.isFile(p);
+ }
+
+ @Override
+ protected List<RawResource> getAllResourcesImpl(String folderPath, long timeStart, long timeEndExclusive) throws IOException {
+ NavigableSet<String> resources = listResources(folderPath);
+ if (resources == null)
+ return Collections.emptyList();
+ List<RawResource> result = Lists.newArrayListWithCapacity(resources.size());
+ try {
+ for (String res : resources) {
+ long ts = getResourceTimestampImpl(res);
+ if (timeStart <= ts && ts < timeEndExclusive) {
+ RawResource resource = getResourceImpl(res);
+ if (resource != null) // can be null if is a sub-folder
+ result.add(resource);
+ }
+ }
+ } catch (IOException ex) {
+ for (RawResource rawResource : result) {
+ IOUtils.closeQuietly(rawResource.inputStream);
+ }
+ throw ex;
+ }
+ return result;
+ }
+
+ @Override
+ protected RawResource getResourceImpl(String resPath) throws IOException {
+ Path p = getRealHDFSPath(resPath);
+ if (fs.exists(p) && fs.isFile(p)) {
+ if (fs.getFileStatus(p).getLen() == 0) {
+ logger.warn("Zero length file: " + p.toString());
+ }
+ FSDataInputStream in = fs.open(p);
+ return new RawResource(fs.open(p), getResourceTimestamp(resPath));
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ protected long getResourceTimestampImpl(String resPath) throws IOException {
+ Path p = getRealHDFSPath(resPath);
+ if (!fs.exists(p) || !fs.isFile(p)) {
+ return 0;
+ }
+ FileStatus status = fs.getFileStatus(p);
+ return status.getModificationTime();
+ }
+
+ @Override
+ protected void putResourceImpl(String resPath, InputStream content, long ts) throws IOException {
+ Path p = getRealHDFSPath(resPath);
+ FSDataOutputStream out = null;
+ try {
+ out = fs.create(p, true);
+ IOUtils.copy(content, out);
+ } finally {
+ IOUtils.closeQuietly(out);
+ }
+ }
+
+ @Override
+ protected long checkAndPutResourceImpl(String resPath, byte[] content, long oldTS, long newTS) throws IOException, IllegalStateException {
+ Path p = getRealHDFSPath(resPath);
+ if (!fs.exists(p)) {
+ if (oldTS != 0) {
+ throw new IllegalStateException("For not exist file. OldTS have to be 0. but Actual oldTS is : " + oldTS);
+ }
+
+ } else {
+ long realLastModify = getResourceTimestamp(resPath);
+ if (realLastModify != oldTS) {
+ throw new IllegalStateException("Overwriting conflict " + resPath + ", expect old TS " + oldTS + ", but found " + realLastModify);
+ }
+ }
+ putResourceImpl(resPath, new ByteArrayInputStream(content), newTS);
+ return newTS;
+ }
+
+ @Override
+ protected void deleteResourceImpl(String resPath) throws IOException {
+ Path p = getRealHDFSPath(resPath);
+ if (fs.exists(p)) {
+ fs.delete(p, true);
+ }
+ }
+
+ @Override
+ protected String getReadableResourcePathImpl(String resPath) {
+ return getRealHDFSPath(resPath).toString();
+ }
+
+ private Path getRealHDFSPath(String resourcePath) {
+ return new Path(this.hdfsMetaPath, resourcePath);
+ }
+
+}
[2/2] kylin git commit: Add Zookeeper Lock
Posted by sh...@apache.org.
Add Zookeeper Lock
Signed-off-by: shaofengshi <sh...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/24cae5c7
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/24cae5c7
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/24cae5c7
Branch: refs/heads/KYLIN-2374
Commit: 24cae5c73889dcca4e18b2de4e12b68415ef2d62
Parents: 0f88801
Author: xiefan46 <95...@qq.com>
Authored: Fri Jan 20 09:48:17 2017 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Fri Jan 20 17:43:47 2017 +0800
----------------------------------------------------------------------
.../apache/kylin/common/KylinConfigBase.java | 13 +-
.../storage/hdfs/ITHDFSResourceStoreTest.java | 65 +-----
.../kylin/storage/hdfs/ITLockManagerTest.java | 206 +++++++++++++++++++
.../kylin/storage/hbase/HBaseResourceStore.java | 1 +
.../org/apache/kylin/storage/hdfs/HDFSLock.java | 41 ----
.../kylin/storage/hdfs/HDFSLockManager.java | 45 ----
.../kylin/storage/hdfs/HDFSResourceStore.java | 90 ++++++--
.../apache/kylin/storage/hdfs/LockManager.java | 117 +++++++++++
.../apache/kylin/storage/hdfs/ResourceLock.java | 51 +++++
.../kylin/storage/hdfs/ZookeeperConfig.java | 27 +++
10 files changed, 483 insertions(+), 173 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/24cae5c7/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 29ad5eb..0602e9d 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -193,6 +193,14 @@ abstract public class KylinConfigBase implements Serializable {
return new StringBuffer(root).append(StringUtils.replaceChars(getMetadataUrlPrefix(), ':', '-')).append("/").toString();
}
+ public String getRawHdfsWorkingDirectory() {
+ String root = getRequired("kylin.env.hdfs-working-dir");
+ if (!root.endsWith("/")) {
+ root += "/";
+ }
+ return root;
+ }
+
// ============================================================================
// METADATA
// ============================================================================
@@ -201,11 +209,6 @@ abstract public class KylinConfigBase implements Serializable {
return getOptional("kylin.metadata.url");
}
- //for hdfs resource store
- public String getHDFSMetadataUrl() {
- return getOptional("kylin.metadata.hdfs.url", "kylin_default_instance_hdfs@hdfs");
- }
-
// for test only
public void setMetadataUrl(String metadataUrl) {
setProperty("kylin.metadata.url", metadataUrl);
http://git-wip-us.apache.org/repos/asf/kylin/blob/24cae5c7/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java b/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java
index ef04957..41bbcc0 100644
--- a/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITHDFSResourceStoreTest.java
@@ -17,10 +17,8 @@
*/
package org.apache.kylin.storage.hdfs;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.engine.mr.HadoopUtil;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.persistence.ResourceStoreTest;
import org.apache.kylin.common.util.HBaseMetadataTestCase;
@@ -29,11 +27,6 @@ import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import static org.junit.Assert.assertEquals;
/**
* Created by xiefan on 17-1-10.
@@ -53,65 +46,13 @@ public class ITHDFSResourceStoreTest extends HBaseMetadataTestCase {
this.cleanupTestMetadata();
}
- @Ignore
- @Test
- public void testHDFSUrl() throws Exception {
- assertEquals("kylin_default_instance_hdfs@hdfs", kylinConfig.getHDFSMetadataUrl());
- System.out.println("hdfs working dir : " + kylinConfig.getHdfsWorkingDirectory());
- }
-
- @Ignore
@Test
- public void testMultiThreadWriteHDFS() throws Exception{
- //System.out.println(kylinConfig.getHdfsWorkingDirectory());
- final Path testDir = new Path("hdfs:///test123");
- final FileSystem fs = HadoopUtil.getFileSystem(testDir);
- final String fileName = "test.json";
- int threadNum = 3;
- ExecutorService service = Executors.newFixedThreadPool(threadNum);
- final CountDownLatch latch = new CountDownLatch(threadNum);
- Path p = new Path(testDir,fileName);
- fs.deleteOnExit(p);
- fs.createNewFile(p);
- for(int i=0;i<threadNum;i++) {
- service.execute(new Runnable() {
- @Override
- public void run() {
- try {
- long id = Thread.currentThread().getId();
- Path p = new Path(testDir, fileName);
- /*while(fs.exists(p)){
- System.out.println("Thread id : " + id + " can not get lock,sleep a while");
- Thread.currentThread().sleep(1000);
- }*/
- while(!fs.createNewFile(p)){
- System.out.println("Thread id : " + id + " can not get lock,sleep a while");
- Thread.currentThread().sleep(1000);
- }
- System.out.println("Thread id : " + id + " get lock, sleep a while");
- Thread.currentThread().sleep(1000);
- fs.delete(p,true);
- System.out.println("Thread id : " + id + " release lock");
- latch.countDown();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- });
- }
- Thread.currentThread().sleep(1000);
- fs.delete(p,true);
- System.out.println("main thread release lock.Waiting threads down");
- System.out.println("file still exist : " + fs.exists(p));
- latch.await();
- }
-
- @Test
- public void testHDFSStore() throws Exception {
+ public void testResourceStoreBasic() throws Exception {
KylinConfig config = KylinConfig.getInstanceFromEnv();
ResourceStore store = new HDFSResourceStore(config);
ResourceStoreTest.testAStore(store);
}
+
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/24cae5c7/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITLockManagerTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITLockManagerTest.java b/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITLockManagerTest.java
new file mode 100644
index 0000000..56347e4
--- /dev/null
+++ b/kylin-it/src/test/java/org/apache/kylin/storage/hdfs/ITLockManagerTest.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.storage.hdfs;
+
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.Assert.assertEquals;
+
+public class ITLockManagerTest extends HBaseMetadataTestCase {
+
+
+ private String zkConnection = "sandbox:2181";
+
+ private KylinConfig kylinConfig;
+
+ private CuratorFramework zkClient;
+
+ private static final String lockRootPath = "/test_lock";
+
+ private LockManager manager;
+
+ private static final int QTY = 5;
+
+ private static final int REPETITIONS = QTY * 10;
+
+ @Before
+ public void setup() throws Exception {
+ this.createTestMetadata();
+ kylinConfig = KylinConfig.getInstanceFromEnv();
+ RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+ zkClient = CuratorFrameworkFactory.newClient(zkConnection, retryPolicy);
+ zkClient.start();
+ manager = new LockManager(kylinConfig, lockRootPath);
+ System.out.println("nodes in lock root : " + zkClient.getChildren().forPath(lockRootPath));
+
+ }
+
+ @After
+ public void after() throws Exception {
+ this.cleanupTestMetadata();
+ zkClient.delete().deletingChildrenIfNeeded().forPath(lockRootPath);
+ List<String> nodes = zkClient.getChildren().forPath("/");
+ System.out.println("nodes in zk after delete : " + nodes);
+ manager.close();
+ }
+
+ @Test
+ public void testCreateLock() throws Exception {
+
+ ResourceLock lock = manager.getLock("/dictionary/numberdict.json");
+ lock.acquire();
+ manager.releaseLock(lock);
+ System.out.println(zkClient.getChildren().forPath(lockRootPath + "/dictionary"));
+ List<String> nodes = zkClient.getChildren().forPath(lockRootPath + "/dictionary");
+ assertEquals(1, nodes.size());
+ assertEquals("numberdict.json", nodes.get(0));
+ }
+
+ @Test
+ public void testLockSafty() throws Exception {
+ // all of the useful sample code is in ExampleClientThatLocks.java
+
+ // FakeLimitedResource simulates some external resource that can only be access by one process at a time
+ final FakeLimitedResource resource = new FakeLimitedResource();
+ ExecutorService service = Executors.newFixedThreadPool(QTY);
+ final TestingServer server = new TestingServer(zkConnection);
+ final List<FutureTask<Void>> tasks = new ArrayList<>();
+ try {
+ for (int i = 0; i < QTY; ++i) {
+ final int index = i;
+ FutureTask<Void> task = new FutureTask<Void>(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ LockManager threadLocalLockManager = new LockManager(kylinConfig, lockRootPath);
+ try {
+ ExampleClientThatLocks example = new ExampleClientThatLocks(threadLocalLockManager, lockRootPath, resource, "Client " + index);
+ for (int j = 0; j < REPETITIONS; ++j) {
+ example.doWork(10, TimeUnit.SECONDS);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } catch (Exception e) {
+ e.printStackTrace();
+ // log or do something
+ } finally {
+ threadLocalLockManager.close();
+ }
+ return null;
+ }
+ });
+ tasks.add(task);
+ service.submit(task);
+ }
+ for (FutureTask<Void> task : tasks) {
+ task.get();
+ }
+ } finally {
+ CloseableUtils.closeQuietly(server);
+ }
+ }
+
+ class FakeLimitedResource {
+ private final AtomicBoolean inUse = new AtomicBoolean(false);
+
+ public void use() throws InterruptedException {
+ // in a real application this would be accessing/manipulating a shared resource
+
+ if (!inUse.compareAndSet(false, true)) {
+ throw new IllegalStateException("Needs to be used by one client at a time");
+ }
+
+ try {
+ Thread.sleep((long) (3 * Math.random()));
+ } finally {
+ inUse.set(false);
+ }
+ }
+ }
+
+ class TestingServer implements Closeable {
+
+ private String connectionString;
+
+ public TestingServer(String connectionStr) {
+ this.connectionString = connectionStr;
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+
+ public String getConnectString() {
+ return connectionString;
+ }
+ }
+
+ class ExampleClientThatLocks {
+
+ private final FakeLimitedResource resource;
+
+ private final String clientName;
+
+ private LockManager lockManager;
+
+ private String lockPath;
+
+ public ExampleClientThatLocks(LockManager lockManager, String lockPath, FakeLimitedResource resource, String clientName) {
+ this.resource = resource;
+ this.clientName = clientName;
+ this.lockManager = lockManager;
+ this.lockPath = lockPath;
+ }
+
+ public void doWork(long time, TimeUnit unit) throws Exception {
+ ResourceLock lock = lockManager.getLock(lockPath);
+ if (!lock.acquire(time, unit)) {
+ throw new IllegalStateException(clientName + " could not acquire the lock");
+ }
+ try {
+ System.out.println(clientName + " has the lock");
+ resource.use();
+ } finally {
+ System.out.println(clientName + " releasing the lock");
+ lock.release(); // always release the lock in a finally block
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/24cae5c7/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
index 6217350..5980cb5 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
@@ -314,6 +314,7 @@ public class HBaseResourceStore extends ResourceStore {
} finally {
IOUtils.closeQuietly(table);
}
+
}
private Result internalGetFromHTable(HTableInterface table, String path, boolean fetchContent, boolean fetchTimestamp) throws IOException {
http://git-wip-us.apache.org/repos/asf/kylin/blob/24cae5c7/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLock.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLock.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLock.java
deleted file mode 100644
index 8710edf..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLock.java
+++ /dev/null
@@ -1,41 +0,0 @@
-package org.apache.kylin.storage.hdfs;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-
-/**
- * Created by xiefan on 17-1-17.
- */
-public class HDFSLock {
-
- private Path rawLock;
-
- private static final Logger logger = LoggerFactory.getLogger(HDFSLock.class);
-
- protected HDFSLock(String resourceFullPath) {
- this.rawLock = new Path(resourceFullPath);
- }
-
- public boolean init(FileSystem fs) throws IOException, InterruptedException {
- if (!fs.isFile(rawLock)) {
- logger.info("Not support directory lock yet");
- return false;
- }
- while (!fs.createNewFile(rawLock)) {
- Thread.currentThread().sleep(1000);
- }
- return true;
- }
-
- public boolean release(FileSystem fs) throws IOException, InterruptedException {
- while (!fs.delete(rawLock, false)) {
- Thread.currentThread().sleep(1000);
- }
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/24cae5c7/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLockManager.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLockManager.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLockManager.java
deleted file mode 100644
index 1cd0800..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSLockManager.java
+++ /dev/null
@@ -1,45 +0,0 @@
-package org.apache.kylin.storage.hdfs;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.kylin.engine.mr.HadoopUtil;
-
-import java.io.IOException;
-
-/**
- * Created by xiefan on 17-1-17.
- */
-public class HDFSLockManager {
-
- private static final String LOCK_HOME = "LOCK_HOME";
-
- private Path lockPath;
-
- private FileSystem fs;
-
- public HDFSLockManager(String hdfsWorkingDir) throws IOException{
- this.lockPath = new Path(hdfsWorkingDir,LOCK_HOME);
- this.fs = HadoopUtil.getFileSystem(lockPath);
- if(!fs.exists(lockPath)){
- fs.create(lockPath);
- }
- }
-
- public HDFSLock getLock(String resourceFullPath) throws IOException,InterruptedException,IllegalStateException{
- HDFSLock lock = new HDFSLock(resourceFullPath);
- boolean success = lock.init(fs);
- if(success){
- return lock;
- }else{
- throw new IllegalStateException("Try get lock fail. Resourse path : " + resourceFullPath);
- }
- }
-
- public void releaseLock(HDFSLock lock) throws IOException,InterruptedException,IllegalStateException{
- boolean success = lock.release(fs);
- if(!success)
- throw new IllegalStateException("Release lock fail");
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/24cae5c7/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java
index c7f0f25..8cb9f0d 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/HDFSResourceStore.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.RawResource;
import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.common.util.HadoopUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,44 +39,58 @@ import java.util.Collections;
import java.util.List;
import java.util.NavigableSet;
import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
-/**
- * Created by xiefan on 17-1-10.
- */
public class HDFSResourceStore extends ResourceStore {
- private static final String DEFAULT_TABLE_NAME = "kylin_default_instance_hdfs";
+ private static final String DEFAULT_FOLDER_NAME = "kylin_default_instance";
private Path hdfsMetaPath;
private FileSystem fs;
- private HDFSLockManager lockManager;
-
private static final Logger logger = LoggerFactory.getLogger(HDFSResourceStore.class);
+ private LockManager lockManager;
+
//public for test. Normal should be protected
- public HDFSResourceStore(KylinConfig kylinConfig) throws IOException {
+ public HDFSResourceStore(KylinConfig kylinConfig) throws Exception {
super(kylinConfig);
- String metadataUrl = kylinConfig.getHDFSMetadataUrl();
- // split TABLE@HBASE_URL
+ String metadataUrl = kylinConfig.getMetadataUrl();
int cut = metadataUrl.indexOf('@');
- String metaDirName = cut < 0 ? DEFAULT_TABLE_NAME : metadataUrl.substring(0, cut);
+ String metaDirName = cut < 0 ? DEFAULT_FOLDER_NAME : metadataUrl.substring(0, cut);
+ metaDirName += "/hdfs_metadata";
+ logger.info("meta dir name :" + metaDirName);
createMetaFolder(metaDirName, kylinConfig);
}
- private void createMetaFolder(String metaDirName, KylinConfig kylinConfig) throws IOException {
+ private void createMetaFolder(String metaDirName, KylinConfig kylinConfig) throws Exception {
String hdfsWorkingDir = kylinConfig.getHdfsWorkingDirectory();
fs = HadoopUtil.getFileSystem(hdfsWorkingDir);
+ logger.info("hdfs working dir : " + hdfsWorkingDir);
Path hdfsWorkingPath = new Path(hdfsWorkingDir);
if (!fs.exists(hdfsWorkingPath)) {
throw new IOException("HDFS working dir not exist");
}
+ //creat lock manager
+ this.lockManager = new LockManager(kylinConfig,kylinConfig.getRawHdfsWorkingDirectory() + metaDirName);
+ //create hdfs meta path
hdfsMetaPath = new Path(hdfsWorkingPath, metaDirName);
if (!fs.exists(hdfsMetaPath)) {
- fs.create(hdfsMetaPath, true);
+ ResourceLock lock = lockManager.getLock(lockManager.getLockPath("/"));
+ try {
+ if (lock.acquire(ZookeeperConfig.DEFAULT_TIMEOUT, TimeUnit.MINUTES)) {
+ logger.info("get root lock successfully");
+ if (!fs.exists(hdfsMetaPath)) {
+ fs.mkdirs(hdfsMetaPath);
+ logger.info("create hdfs meta path");
+ }
+ }
+ } finally {
+ lockManager.releaseLock(lock);
+ }
}
- lockManager = new HDFSLockManager(hdfsWorkingDir);
+ logger.info("hdfs meta path : " + hdfsMetaPath.toString());
}
@Override
@@ -132,7 +146,8 @@ public class HDFSResourceStore extends ResourceStore {
logger.warn("Zero length file: " + p.toString());
}
FSDataInputStream in = fs.open(p);
- return new RawResource(fs.open(p), getResourceTimestamp(resPath));
+ long t = in.readLong();
+ return new RawResource(in, t);
} else {
return null;
}
@@ -144,19 +159,42 @@ public class HDFSResourceStore extends ResourceStore {
if (!fs.exists(p) || !fs.isFile(p)) {
return 0;
}
- FileStatus status = fs.getFileStatus(p);
- return status.getModificationTime();
+ FSDataInputStream in = null;
+ ResourceLock lock = null;
+ try {
+ lock = lockManager.getLock(resPath);
+ lock.acquire(ZookeeperConfig.DEFAULT_TIMEOUT, TimeUnit.MINUTES);
+ in = fs.open(p);
+ long t = in.readLong();
+ return t;
+ } catch (Exception e) {
+ throw new IOException("Put resource fail", e);
+ } finally {
+ IOUtils.closeQuietly(in);
+ lockManager.releaseLock(lock);
+ }
+
}
@Override
protected void putResourceImpl(String resPath, InputStream content, long ts) throws IOException {
+ logger.info("res path : " + resPath);
Path p = getRealHDFSPath(resPath);
+ logger.info("put resource : " + p.toUri());
FSDataOutputStream out = null;
+ ResourceLock lock = null;
try {
+ lock = lockManager.getLock(resPath);
+ lock.acquire(ZookeeperConfig.DEFAULT_TIMEOUT, TimeUnit.MINUTES);
out = fs.create(p, true);
+ out.writeLong(ts);
IOUtils.copy(content, out);
+
+ } catch (Exception e) {
+ throw new IOException("Put resource fail", e);
} finally {
IOUtils.closeQuietly(out);
+ lockManager.releaseLock(lock);
}
}
@@ -180,9 +218,18 @@ public class HDFSResourceStore extends ResourceStore {
@Override
protected void deleteResourceImpl(String resPath) throws IOException {
- Path p = getRealHDFSPath(resPath);
- if (fs.exists(p)) {
- fs.delete(p, true);
+ ResourceLock lock = null;
+ try {
+ lock = lockManager.getLock(resPath);
+ lock.acquire(ZookeeperConfig.DEFAULT_TIMEOUT, TimeUnit.MINUTES);
+ Path p = getRealHDFSPath(resPath);
+ if (fs.exists(p)) {
+ fs.delete(p, true);
+ }
+ } catch (Exception e) {
+ throw new IOException("Delete resource fail", e);
+ } finally {
+ lockManager.releaseLock(lock);
}
}
@@ -192,7 +239,10 @@ public class HDFSResourceStore extends ResourceStore {
}
private Path getRealHDFSPath(String resourcePath) {
+ if (resourcePath.startsWith("/") && resourcePath.length() > 1)
+ resourcePath = resourcePath.substring(1, resourcePath.length());
return new Path(this.hdfsMetaPath, resourcePath);
}
+
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/24cae5c7/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/LockManager.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/LockManager.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/LockManager.java
new file mode 100644
index 0000000..a4d0080
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/LockManager.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.storage.hdfs;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.storage.hbase.util.ZookeeperDistributedJobLock;
+import org.apache.zookeeper.CreateMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.util.Arrays;
+
+public class LockManager {
+
+ private static Logger logger = LoggerFactory.getLogger(ZookeeperDistributedJobLock.class);
+
+ final private KylinConfig config;
+
+ final CuratorFramework zkClient;
+
+ private String lockRootPath;
+
+ public LockManager(String lockRootPath) throws Exception{
+
+ this(KylinConfig.getInstanceFromEnv(),lockRootPath);
+ }
+
+ public LockManager(KylinConfig config,String lockRootPath) throws Exception{
+ this.config = config;
+ this.lockRootPath = lockRootPath;
+ String zkConnectString = getZKConnectString();
+ logger.info("zk connection string:" + zkConnectString);
+ if (StringUtils.isEmpty(zkConnectString)) {
+ throw new IllegalArgumentException("ZOOKEEPER_QUORUM is empty!");
+ }
+ RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+ zkClient = CuratorFrameworkFactory.newClient(zkConnectString, retryPolicy);
+ zkClient.start();
+ if(zkClient.checkExists().forPath(lockRootPath) == null)
+ zkClient.create().creatingParentsIfNeeded().forPath(lockRootPath);
+ Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+ @Override
+ public void run() {
+ close();
+ }
+ }));
+ }
+
+ public ResourceLock getLock(String name) throws Exception {
+ String lockPath = getLockPath(name);
+ InterProcessMutex lock = new InterProcessMutex(zkClient, lockPath);
+ return new ResourceLock(lockPath, lock);
+ }
+
+ public void releaseLock(ResourceLock lock) {
+ try {
+ if (lock != null)
+ lock.release();
+ } catch (Exception e) {
+ logger.error("Fail to release lock");
+ e.printStackTrace();
+ }
+ }
+
+ private static String getZKConnectString() {
+ final String serverList = ZookeeperConfig.DEFAULT_ZK_HOST;
+ final String port = ZookeeperConfig.DEFAULT_ZK_PORT;
+ return StringUtils.join(Iterables.transform(Arrays.asList(serverList.split(",")), new Function<String, String>() {
+ @Nullable
+ @Override
+ public String apply(String input) {
+ return input + ":" + port;
+ }
+ }), ",");
+ }
+
+ public String getLockPath(String resourceName) {
+ if (!resourceName.startsWith("/"))
+ resourceName = "/" + resourceName;
+ if (resourceName.endsWith("/"))
+ resourceName = resourceName.substring(0, resourceName.length() - 1);
+ return lockRootPath + resourceName;
+ }
+
+ public void close() {
+ try {
+ zkClient.close();
+ } catch (Exception e) {
+ logger.error("error occurred to close PathChildrenCache", e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/24cae5c7/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/ResourceLock.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/ResourceLock.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/ResourceLock.java
new file mode 100644
index 0000000..9d51871
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/ResourceLock.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.storage.hdfs;
+
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+
+import java.util.concurrent.TimeUnit;
+
+
+public class ResourceLock {
+
+ private String resourcePath;
+
+ private InterProcessMutex lock;
+
+ protected ResourceLock(String resourcePath, InterProcessMutex lock) {
+ this.resourcePath = resourcePath;
+ this.lock = lock;
+ }
+
+ public boolean acquire(long time, TimeUnit unit) throws Exception {
+ return lock.acquire(time, unit);
+ }
+
+ public void acquire() throws Exception{
+ lock.acquire();
+ }
+
+ protected void release() throws Exception {
+ lock.release();
+ }
+
+ public String getResourcePath() {
+ return resourcePath;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/24cae5c7/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/ZookeeperConfig.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/ZookeeperConfig.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/ZookeeperConfig.java
new file mode 100644
index 0000000..3f67e8d
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hdfs/ZookeeperConfig.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.storage.hdfs;
+
+
+public interface ZookeeperConfig {
+ String DEFAULT_ZK_HOST = "sandbox";
+
+ String DEFAULT_ZK_PORT = "2181";
+
+ long DEFAULT_TIMEOUT = 10;
+}