You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2014/06/26 01:43:20 UTC
git commit: [HELIX-130] Add a zk copy tool, rb=22986
Repository: helix
Updated Branches:
refs/heads/helix-0.6.2-release b706ddac1 -> 48a765881
[HELIX-130] Add a zk copy tool, rb=22986
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/48a76588
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/48a76588
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/48a76588
Branch: refs/heads/helix-0.6.2-release
Commit: 48a765881545fa85b523ed63b6133eca7b17d17b
Parents: b706dda
Author: zzhang <zz...@apache.org>
Authored: Wed Jun 25 16:42:53 2014 -0700
Committer: zzhang <zz...@apache.org>
Committed: Wed Jun 25 16:42:53 2014 -0700
----------------------------------------------------------------------
helix-core/pom.xml | 4 +
.../java/org/apache/helix/tools/ZkCopy.java | 193 +++++++++++++++++++
.../java/org/apache/helix/tools/TestZkCopy.java | 67 +++++++
3 files changed, 264 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/48a76588/helix-core/pom.xml
----------------------------------------------------------------------
diff --git a/helix-core/pom.xml b/helix-core/pom.xml
index a62fe55..18fe346 100644
--- a/helix-core/pom.xml
+++ b/helix-core/pom.xml
@@ -198,6 +198,10 @@ under the License.
<name>zk-dumper</name>
</program>
<program>
+ <mainClass>org.apache.helix.tools.ZkCopy</mainClass>
+ <name>zkcopy</name>
+ </program>
+ <program>
<mainClass>org.apache.helix.tools.ZKLogFormatter</mainClass>
<name>zk-log-parser</name>
</program>
http://git-wip-us.apache.org/repos/asf/helix/blob/48a76588/helix-core/src/main/java/org/apache/helix/tools/ZkCopy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ZkCopy.java b/helix-core/src/main/java/org/apache/helix/tools/ZkCopy.java
new file mode 100644
index 0000000..69369a5
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/ZkCopy.java
@@ -0,0 +1,193 @@
+package org.apache.helix.tools;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.net.URI;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.helix.manager.zk.ByteArraySerializer;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.common.PathUtils;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * Tool for copying a zk/file path to another zk/file path
+ */
+public class ZkCopy {
+ enum ZkCopyScheme {
+ zk
+ }
+
+ private static Logger logger = Logger.getLogger(ZkCopy.class);
+ private static final String src = "src";
+ private static final String dst = "dst";
+
+ @SuppressWarnings("static-access")
+ private static Options constructCmdLineOpt() {
+ Option srcOpt =
+ OptionBuilder.withLongOpt(src).hasArgs(1).isRequired(true)
+ .withArgName("source-URI (e.g. zk://localhost:2181/src-path")
+ .withDescription("Provide source URI").create();
+
+ Option dstOpt =
+ OptionBuilder.withLongOpt(dst).hasArgs(1).isRequired(true)
+ .withArgName("destination-URI (e.g. zk://localhost:2181/dst-path")
+ .withDescription("Provide destination URI").create();
+
+ Options options = new Options();
+ options.addOption(srcOpt);
+ options.addOption(dstOpt);
+ return options;
+ }
+
+ private static void printUsage(Options cliOptions) {
+ HelpFormatter helpFormatter = new HelpFormatter();
+ helpFormatter.setWidth(1000);
+ helpFormatter.printHelp("java " + ZkCopy.class.getName(), cliOptions);
+ }
+
+ private static String concatenate(String path, String suffix) {
+ if (suffix == null || suffix.isEmpty()) {
+ return path;
+ }
+
+ if (path.endsWith("/") || suffix.startsWith("/")) {
+ return path + suffix;
+ } else {
+ return path + "/" + suffix;
+ }
+ }
+
+ private static void zkCopy(ZkClient srcClient, String srcPath, ZkClient dstClient, String dstPath) {
+ // Strip off tailing "/"
+ if (!srcPath.equals("/") && srcPath.endsWith("/")) {
+ srcPath = srcPath.substring(0, srcPath.length() - 1);
+ }
+
+ if (!dstPath.equals("/") && dstPath.endsWith("/")) {
+ dstPath = dstPath.substring(0, dstPath.length() - 1);
+ }
+
+ // Validate paths
+ PathUtils.validatePath(srcPath);
+ PathUtils.validatePath(dstPath);
+
+ if (srcPath.equals(dstPath)) {
+ logger.info("srcPath == dstPath. Skip copying");
+ return;
+ }
+
+ if (srcPath.startsWith(dstPath) || dstPath.startsWith(srcPath)) {
+ throw new IllegalArgumentException(
+ "srcPath/dstPath can't be prefix of dstPath/srcPath, was srcPath: " + srcPath
+ + ", dstPath: " + dstPath);
+ }
+
+ // Recursive copy using BFS
+ List<String> queue = new LinkedList<String>();
+ queue.add("");
+ while (!queue.isEmpty()) {
+ String path = queue.remove(0);
+ Stat stat = new Stat();
+ String fromPath = concatenate(srcPath, path);
+ Object data = srcClient.readDataAndStat(fromPath, stat, false);
+ if (stat.getEphemeralOwner() != 0) {
+ logger.info("Skip copying ephemeral znode: " + fromPath);
+ continue;
+ }
+ String toPath = concatenate(dstPath, path);
+ System.out.println("Copy " + fromPath + " to " + toPath);
+ dstClient.createPersistent(toPath, data);
+ List<String> children = srcClient.getChildren(fromPath);
+ if (children != null && children.size() > 0) {
+ for (String child : children) {
+ queue.add(concatenate(path, child));
+ }
+ }
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ CommandLineParser cliParser = new GnuParser();
+ Options cliOptions = constructCmdLineOpt();
+ CommandLine cmd = null;
+
+ try {
+ cmd = cliParser.parse(cliOptions, args);
+ } catch (ParseException pe) {
+ System.err.println("CommandLineClient: failed to parse command-line options: "
+ + pe.toString());
+ printUsage(cliOptions);
+ System.exit(1);
+ }
+
+ URI srcUri = new URI(cmd.getOptionValue(src));
+ URI dstUri = new URI(cmd.getOptionValue(dst));
+
+ ZkCopyScheme srcScheme = ZkCopyScheme.valueOf(srcUri.getScheme());
+ ZkCopyScheme dstScheme = ZkCopyScheme.valueOf(dstUri.getScheme());
+
+ if (srcScheme == ZkCopyScheme.zk && dstScheme == ZkCopyScheme.zk) {
+ String srcZkAddr = srcUri.getAuthority();
+ String dstZkAddr = dstUri.getAuthority();
+
+ ZkClient srcClient = null;
+ ZkClient dstClient = null;
+ try {
+ if (srcZkAddr.equals(dstZkAddr)) {
+ srcClient =
+ dstClient =
+ new ZkClient(srcZkAddr, ZkClient.DEFAULT_SESSION_TIMEOUT,
+ ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ByteArraySerializer());
+ } else {
+ srcClient =
+ new ZkClient(srcZkAddr, ZkClient.DEFAULT_SESSION_TIMEOUT,
+ ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ByteArraySerializer());
+ dstClient =
+ new ZkClient(dstZkAddr, ZkClient.DEFAULT_SESSION_TIMEOUT,
+ ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ByteArraySerializer());
+ }
+ String srcPath = srcUri.getPath();
+ String dstPath = dstUri.getPath();
+ zkCopy(srcClient, srcPath, dstClient, dstPath);
+ } finally {
+ if (srcClient != null) {
+ srcClient.close();
+ }
+ if (dstClient != null) {
+ dstClient.close();
+ }
+ }
+ } else {
+ System.err.println("Unsupported scheme. srcScheme: " + srcScheme + ", dstScheme: " + dstScheme);
+ System.exit(1);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/48a76588/helix-core/src/test/java/org/apache/helix/tools/TestZkCopy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/tools/TestZkCopy.java b/helix-core/src/test/java/org/apache/helix/tools/TestZkCopy.java
new file mode 100644
index 0000000..21a1ad5
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/tools/TestZkCopy.java
@@ -0,0 +1,67 @@
+package org.apache.helix.tools;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestZkCopy extends ZkUnitTestBase {
+
+ @Test
+ public void test() throws Exception {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+ String fromPath = "/" + clusterName + "/from";
+ _gZkClient.createPersistent(fromPath, true);
+ for (int i = 0; i < 5; i++) {
+ for (int j = 0; j < 5; j++) {
+ String path = String.format("%s/%d/%d", fromPath, i, j);
+ _gZkClient.createPersistent(path, true);
+ _gZkClient.writeData(path, new ZNRecord(String.format("%d/%d", i, j)));
+ }
+ }
+
+ // Copy
+ String toPath = "/" + clusterName + "/to";
+ ZkCopy.main(new String[]{"--src", "zk://" + ZK_ADDR + fromPath, "--dst", "zk://" + ZK_ADDR + toPath});
+
+ // Verify
+ Assert.assertTrue(_gZkClient.exists(toPath));
+ Assert.assertNull(_gZkClient.readData(toPath));
+ for (int i = 0; i < 5; i++) {
+ for (int j = 0; j < 5; j++) {
+ String path = String.format("%s/%d/%d", toPath, i, j);
+ Assert.assertTrue(_gZkClient.exists(path));
+ ZNRecord record = _gZkClient.readData(path);
+ Assert.assertEquals(String.format("%d/%d", i, j), record.getId());
+ }
+ }
+
+ System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+ }
+}