You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ba...@apache.org on 2016/04/22 01:48:22 UTC
[1/2] falcon git commit: FALCON-1861 Support HDFS Snapshot based
replication in Falcon
Repository: falcon
Updated Branches:
refs/heads/master 7c0481eac -> aba79aae2
http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/common/src/main/java/org/apache/falcon/retention/EvictionHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/retention/EvictionHelper.java b/common/src/main/java/org/apache/falcon/retention/EvictionHelper.java
index 1457b06..29fcdb9 100644
--- a/common/src/main/java/org/apache/falcon/retention/EvictionHelper.java
+++ b/common/src/main/java/org/apache/falcon/retention/EvictionHelper.java
@@ -40,11 +40,14 @@ public final class EvictionHelper {
private EvictionHelper(){}
public static Pair<Date, Date> getDateRange(String period) throws ELException {
- Long duration = (Long) EVALUATOR.evaluate("${" + period + "}",
- Long.class, RESOLVER, RESOLVER);
+ Long duration = evalExpressionToMilliSeconds(period);
Date end = new Date();
Date start = new Date(end.getTime() - duration);
return Pair.of(start, end);
}
+ public static Long evalExpressionToMilliSeconds(String period) throws ELException {
+ return (Long) EVALUATOR.evaluate("${" + period + "}", Long.class, RESOLVER, RESOLVER);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/common/src/main/resources/startup.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties
index b7bac73..7a850f8 100644
--- a/common/src/main/resources/startup.properties
+++ b/common/src/main/resources/startup.properties
@@ -73,7 +73,7 @@
*.broker.impl.class=org.apache.activemq.ActiveMQConnectionFactory
##### List of shared libraries for Falcon workflows #####
-*.shared.libs=activemq-all,geronimo-j2ee-management,jms,json-simple,oozie-client,spring-jms,commons-lang3
+*.shared.libs=activemq-all,geronimo-j2ee-management,jms,json-simple,oozie-client,spring-jms,commons-lang3,commons-el
##### Workflow Job Execution Completion listeners #####
*.workflow.execution.listeners=
http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/common/src/test/java/org/apache/falcon/retention/EvictionHelperTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/retention/EvictionHelperTest.java b/common/src/test/java/org/apache/falcon/retention/EvictionHelperTest.java
new file mode 100644
index 0000000..ce40068
--- /dev/null
+++ b/common/src/test/java/org/apache/falcon/retention/EvictionHelperTest.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.retention;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+/**
+ * Tests for EvictionHelper.
+ */
+public class EvictionHelperTest {
+ @Test
+ public void testEvictionHelper() throws Exception {
+ Assert.assertEquals(EvictionHelper.evalExpressionToMilliSeconds("days(3)").longValue(), 259200000);
+ Assert.assertEquals(EvictionHelper.evalExpressionToMilliSeconds("days(1)").longValue(), 86400000);
+ Assert.assertEquals(EvictionHelper.evalExpressionToMilliSeconds("hours(5)").longValue(), 18000000);
+ Assert.assertEquals(EvictionHelper.evalExpressionToMilliSeconds("minutes(5)").longValue(), 300000);
+ Assert.assertEquals(EvictionHelper.evalExpressionToMilliSeconds("minutes(1)").longValue(), 60000);
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/extensions/pom.xml
----------------------------------------------------------------------
diff --git a/extensions/pom.xml b/extensions/pom.xml
index 6a0725a..eb2faea 100644
--- a/extensions/pom.xml
+++ b/extensions/pom.xml
@@ -43,6 +43,30 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-auth</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</profile>
</profiles>
@@ -77,6 +101,11 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.falcon</groupId>
+ <artifactId>falcon-test-util</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/extensions/src/main/java/org/apache/falcon/extensions/AbstractExtension.java
----------------------------------------------------------------------
diff --git a/extensions/src/main/java/org/apache/falcon/extensions/AbstractExtension.java b/extensions/src/main/java/org/apache/falcon/extensions/AbstractExtension.java
index 11b3725..24bbb87 100644
--- a/extensions/src/main/java/org/apache/falcon/extensions/AbstractExtension.java
+++ b/extensions/src/main/java/org/apache/falcon/extensions/AbstractExtension.java
@@ -20,6 +20,7 @@ package org.apache.falcon.extensions;
import org.apache.falcon.FalconException;
import org.apache.falcon.extensions.mirroring.hdfs.HdfsMirroringExtension;
+import org.apache.falcon.extensions.mirroring.hdfsSnapshot.HdfsSnapshotMirroringExtension;
import org.apache.falcon.extensions.mirroring.hive.HiveMirroringExtension;
import java.util.ArrayList;
@@ -33,12 +34,14 @@ import java.util.Properties;
public abstract class AbstractExtension {
private static final List<String> TRUSTED_EXTENSIONS = Arrays.asList(
new HdfsMirroringExtension().getName().toUpperCase(),
+ new HdfsSnapshotMirroringExtension().getName().toUpperCase(),
new HiveMirroringExtension().getName().toUpperCase());
private static List<AbstractExtension> extensions = new ArrayList<>();
public static List<AbstractExtension> getExtensions() {
if (extensions.isEmpty()) {
extensions.add(new HdfsMirroringExtension());
+ extensions.add(new HdfsSnapshotMirroringExtension());
extensions.add(new HiveMirroringExtension());
}
return extensions;
http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfsSnapshot/HdfsSnapshotMirrorProperties.java
----------------------------------------------------------------------
diff --git a/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfsSnapshot/HdfsSnapshotMirrorProperties.java b/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfsSnapshot/HdfsSnapshotMirrorProperties.java
new file mode 100644
index 0000000..f179896
--- /dev/null
+++ b/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfsSnapshot/HdfsSnapshotMirrorProperties.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.extensions.mirroring.hdfsSnapshot;
+
+/**
+ * Hdfs Snapshot Extension properties.
+ */
+public enum HdfsSnapshotMirrorProperties {
+ SOURCE_CLUSTER("sourceCluster", "Snapshot replication source cluster", true),
+ SOURCE_NN("sourceNN", "Snapshot replication source cluster namenode", false),
+ SOURCE_EXEC_URL("sourceExecUrl", "Snapshot replication source execute endpoint", false),
+ SOURCE_NN_KERBEROS_PRINCIPAL("sourceNNKerberosPrincipal",
+ "Snapshot replication source kerberos principal", false),
+
+ SOURCE_SNAPSHOT_DIR("sourceSnapshotDir", "Location of source snapshot path", true),
+ SOURCE_SNAPSHOT_RETENTION_POLICY("sourceSnapshotRetentionPolicy", "Retention policy for source snapshots", false),
+ SOURCE_SNAPSHOT_RETENTION_AGE_LIMIT("sourceSnapshotRetentionAgeLimit",
+ "Delete source snapshots older than this age", true),
+ SOURCE_SNAPSHOT_RETENTION_NUMBER("sourceSnapshotRetentionNumber",
+ "Number of latest source snapshots to retain on source", true),
+
+ TARGET_CLUSTER("targetCluster", "Snapshot replication target cluster", true),
+ TARGET_NN("targetNN", "Snapshot replication target cluster namenode", false),
+ TARGET_EXEC_URL("targetExecUrl", "Snapshot replication target execute endpoint", false),
+ TARGET_NN_KERBEROS_PRINCIPAL("targetNNKerberosPrincipal",
+ "Snapshot replication target kerberos principal", false),
+
+ TARGET_SNAPSHOT_DIR("targetSnapshotDir", "Target Hive metastore uri", true),
+ TARGET_SNAPSHOT_RETENTION_POLICY("targetSnapshotRetentionPolicy", "Retention policy for target snapshots", false),
+ TARGET_SNAPSHOT_RETENTION_AGE_LIMIT("targetSnapshotRetentionAgeLimit",
+ "Delete target snapshots older than this age", true),
+ TARGET_SNAPSHOT_RETENTION_NUMBER("targetSnapshotRetentionNumber",
+ "Number of latest target snapshots to retain on source", true),
+
+ DISTCP_MAX_MAPS("distcpMaxMaps", "Maximum number of maps used during distcp", false),
+ MAP_BANDWIDTH_IN_MB("distcpMapBandwidth", "Bandwidth in MB/s used by each mapper during replication", false),
+
+ TDE_ENCRYPTION_ENABLED("tdeEncryptionEnabled", "Is TDE encryption enabled on source and target", false),
+ SNAPSHOT_JOB_NAME("snapshotJobName", "Name of snapshot based mirror job", false);
+
+
+ private final String name;
+ private final String description;
+ private final boolean isRequired;
+
+ HdfsSnapshotMirrorProperties(String name, String description, boolean isRequired) {
+ this.name = name;
+ this.description = description;
+ this.isRequired = isRequired;
+ }
+
+ public String getName() {
+ return this.name;
+ }
+
+ public String getDescription() {
+ return description;
+ }
+
+ public boolean isRequired() {
+ return isRequired;
+ }
+
+ @Override
+ public String toString() {
+ return getName();
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfsSnapshot/HdfsSnapshotMirroringExtension.java
----------------------------------------------------------------------
diff --git a/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfsSnapshot/HdfsSnapshotMirroringExtension.java b/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfsSnapshot/HdfsSnapshotMirroringExtension.java
new file mode 100644
index 0000000..09cce3b
--- /dev/null
+++ b/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfsSnapshot/HdfsSnapshotMirroringExtension.java
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.extensions.mirroring.hdfsSnapshot;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.ClusterHelper;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.extensions.AbstractExtension;
+import org.apache.falcon.extensions.ExtensionProperties;
+import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.falcon.security.SecurityUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * Hdfs snapshot mirroring extension.
+ */
+public class HdfsSnapshotMirroringExtension extends AbstractExtension {
+ private static final Logger LOG = LoggerFactory.getLogger(HdfsSnapshotMirroringExtension.class);
+ private static final String EXTENSION_NAME = "HDFS-SNAPSHOT-MIRRORING";
+ private static final String DEFAULT_RETENTION_POLICY = "delete";
+ public static final String EMPTY_KERBEROS_PRINCIPAL = "NA";
+
+ @Override
+ public String getName() {
+ return EXTENSION_NAME;
+ }
+
+ @Override
+ public void validate(final Properties extensionProperties) throws FalconException {
+ for (HdfsSnapshotMirrorProperties option : HdfsSnapshotMirrorProperties.values()) {
+ if (extensionProperties.getProperty(option.getName()) == null && option.isRequired()) {
+ throw new FalconException("Missing extension property: " + option.getName());
+ }
+ }
+
+ Cluster sourceCluster = ClusterHelper.getCluster(extensionProperties.getProperty(
+ HdfsSnapshotMirrorProperties.SOURCE_CLUSTER.getName()));
+ if (sourceCluster == null) {
+ throw new FalconException("SourceCluster entity "
+ + HdfsSnapshotMirrorProperties.SOURCE_CLUSTER.getName() + " not found");
+ }
+ Cluster targetCluster = ClusterHelper.getCluster(extensionProperties.getProperty(
+ HdfsSnapshotMirrorProperties.TARGET_CLUSTER.getName()));
+ if (targetCluster == null) {
+ throw new FalconException("TargetCluster entity "
+ + HdfsSnapshotMirrorProperties.TARGET_CLUSTER.getName() + " not found");
+ }
+
+ Configuration sourceConf = ClusterHelper.getConfiguration(sourceCluster);
+ Configuration targetConf = ClusterHelper.getConfiguration(targetCluster);
+ DistributedFileSystem sourceFileSystem =
+ HadoopClientFactory.get().createDistributedProxiedFileSystem(sourceConf);
+ DistributedFileSystem targetFileSystem =
+ HadoopClientFactory.get().createDistributedProxiedFileSystem(targetConf);
+
+ Path sourcePath = new Path(extensionProperties.getProperty(
+ HdfsSnapshotMirrorProperties.SOURCE_SNAPSHOT_DIR.getName()));
+ Path targetPath = new Path(extensionProperties.getProperty(
+ HdfsSnapshotMirrorProperties.TARGET_SNAPSHOT_DIR.getName()));
+
+ // check if source and target path's exist and are snapshot-able
+ try {
+ if (sourceFileSystem.exists(sourcePath)) {
+ if (!isDirSnapshotable(sourceFileSystem, sourcePath)) {
+ throw new FalconException(HdfsSnapshotMirrorProperties.SOURCE_SNAPSHOT_DIR.getName()
+ + " " + sourcePath.toString() + " does not allow snapshots.");
+ }
+ } else {
+ throw new FalconException(HdfsSnapshotMirrorProperties.SOURCE_SNAPSHOT_DIR.getName()
+ + " " + sourcePath.toString() + " does not exist.");
+ }
+ if (targetFileSystem.exists(targetPath)) {
+ if (!isDirSnapshotable(targetFileSystem, targetPath)) {
+ throw new FalconException(HdfsSnapshotMirrorProperties.TARGET_SNAPSHOT_DIR.getName()
+ + " " + targetPath.toString() + " does not allow snapshots.");
+ }
+ } else {
+ throw new FalconException(HdfsSnapshotMirrorProperties.TARGET_SNAPSHOT_DIR.getName()
+ + " " + targetPath.toString() + " does not exist.");
+ }
+ } catch (IOException e) {
+ throw new FalconException(e.getMessage(), e);
+ }
+
+
+ }
+
+ private static boolean isDirSnapshotable(DistributedFileSystem hdfs, Path path) throws FalconException {
+ try {
+ LOG.debug("HDFS Snapshot extension validating if dir {} is snapshotable.", path.toString());
+ SnapshottableDirectoryStatus[] snapshotableDirs = hdfs.getSnapshottableDirListing();
+ if (snapshotableDirs != null && snapshotableDirs.length > 0) {
+ for (SnapshottableDirectoryStatus dir : snapshotableDirs) {
+ if (dir.getFullPath().toString().equals(path.toString())) {
+ return true;
+ }
+ }
+ }
+ return false;
+ } catch (IOException e) {
+ LOG.error("Unable to verify if dir {} is snapshot-able. {}", path.toString(), e.getMessage());
+ throw new FalconException("Unable to verify if dir " + path.toString() + " is snapshot-able", e);
+ }
+ }
+
+ @Override
+ public Properties getAdditionalProperties(final Properties extensionProperties) throws FalconException {
+ Properties additionalProperties = new Properties();
+
+ // Add default properties if not passed
+ String distcpMaxMaps = extensionProperties.getProperty(HdfsSnapshotMirrorProperties.DISTCP_MAX_MAPS.getName());
+ if (StringUtils.isBlank(distcpMaxMaps)) {
+ additionalProperties.put(HdfsSnapshotMirrorProperties.DISTCP_MAX_MAPS.getName(), "1");
+ }
+
+ String distcpMapBandwidth = extensionProperties.getProperty(
+ HdfsSnapshotMirrorProperties.MAP_BANDWIDTH_IN_MB.getName());
+ if (StringUtils.isBlank(distcpMapBandwidth)) {
+ additionalProperties.put(HdfsSnapshotMirrorProperties.MAP_BANDWIDTH_IN_MB.getName(), "100");
+ }
+
+ String tdeEnabled = extensionProperties.getProperty(
+ HdfsSnapshotMirrorProperties.TDE_ENCRYPTION_ENABLED.getName());
+ if (StringUtils.isNotBlank(tdeEnabled) && Boolean.parseBoolean(tdeEnabled)) {
+ additionalProperties.put(HdfsSnapshotMirrorProperties.TDE_ENCRYPTION_ENABLED.getName(), "true");
+ } else {
+ additionalProperties.put(HdfsSnapshotMirrorProperties.TDE_ENCRYPTION_ENABLED.getName(), "false");
+ }
+
+ // Add sourceCluster properties
+ Cluster sourceCluster = ClusterHelper.getCluster(extensionProperties.getProperty(
+ HdfsSnapshotMirrorProperties.SOURCE_CLUSTER.getName()));
+ if (sourceCluster == null) {
+ LOG.error("Cluster entity {} not found", HdfsSnapshotMirrorProperties.SOURCE_CLUSTER.getName());
+ throw new FalconException("Cluster entity "
+ + HdfsSnapshotMirrorProperties.SOURCE_CLUSTER.getName() + " not found");
+ }
+ additionalProperties.put(HdfsSnapshotMirrorProperties.SOURCE_NN.getName(),
+ ClusterHelper.getStorageUrl(sourceCluster));
+ additionalProperties.put(HdfsSnapshotMirrorProperties.SOURCE_EXEC_URL.getName(),
+ ClusterHelper.getMREndPoint(sourceCluster));
+ String sourceKerberosPrincipal = ClusterHelper.getPropertyValue(sourceCluster, SecurityUtil.NN_PRINCIPAL);
+ if (StringUtils.isBlank(sourceKerberosPrincipal)) {
+ sourceKerberosPrincipal = EMPTY_KERBEROS_PRINCIPAL;
+ }
+ additionalProperties.put(HdfsSnapshotMirrorProperties.SOURCE_NN_KERBEROS_PRINCIPAL.getName(),
+ sourceKerberosPrincipal);
+
+ String sourceRetentionPolicy = extensionProperties.getProperty(
+ HdfsSnapshotMirrorProperties.SOURCE_SNAPSHOT_RETENTION_POLICY.getName());
+ if (StringUtils.isBlank(sourceRetentionPolicy)) {
+ sourceRetentionPolicy = DEFAULT_RETENTION_POLICY;
+ }
+ validateRetentionPolicy(sourceRetentionPolicy);
+ additionalProperties.put(HdfsSnapshotMirrorProperties.SOURCE_SNAPSHOT_RETENTION_POLICY.getName(),
+ sourceRetentionPolicy);
+
+ // Add targetCluster properties
+ Cluster targetCluster = ClusterHelper.getCluster(extensionProperties.getProperty(
+ HdfsSnapshotMirrorProperties.TARGET_CLUSTER.getName()));
+ if (targetCluster == null) {
+ LOG.error("Cluster entity {} not found", HdfsSnapshotMirrorProperties.TARGET_CLUSTER.getName());
+ throw new FalconException("Cluster entity "
+ + HdfsSnapshotMirrorProperties.TARGET_CLUSTER.getName() + " not found");
+ }
+ additionalProperties.put(HdfsSnapshotMirrorProperties.TARGET_NN.getName(),
+ ClusterHelper.getStorageUrl(targetCluster));
+ additionalProperties.put(HdfsSnapshotMirrorProperties.TARGET_EXEC_URL.getName(),
+ ClusterHelper.getMREndPoint(targetCluster));
+ String targetKerberosPrincipal = ClusterHelper.getPropertyValue(targetCluster, SecurityUtil.NN_PRINCIPAL);
+ if (StringUtils.isBlank(targetKerberosPrincipal)) {
+ targetKerberosPrincipal = EMPTY_KERBEROS_PRINCIPAL;
+ }
+ additionalProperties.put(HdfsSnapshotMirrorProperties.TARGET_NN_KERBEROS_PRINCIPAL.getName(),
+ targetKerberosPrincipal);
+
+ String targetRetentionPolicy = extensionProperties.getProperty(
+ HdfsSnapshotMirrorProperties.TARGET_SNAPSHOT_RETENTION_POLICY.getName());
+ if (StringUtils.isBlank(targetRetentionPolicy)) {
+ targetRetentionPolicy = DEFAULT_RETENTION_POLICY;
+ }
+ validateRetentionPolicy(targetRetentionPolicy);
+ additionalProperties.put(HdfsSnapshotMirrorProperties.TARGET_SNAPSHOT_RETENTION_POLICY.getName(),
+ targetRetentionPolicy);
+
+ // Add jobName and jobCluster properties.
+ String jobName = extensionProperties.getProperty(ExtensionProperties.JOB_NAME.getName());
+ if (StringUtils.isBlank(jobName)) {
+ throw new FalconException("Property "
+ + ExtensionProperties.JOB_NAME.getName() + " cannot be null");
+ }
+ additionalProperties.put(HdfsSnapshotMirrorProperties.SNAPSHOT_JOB_NAME.getName(), jobName);
+
+ String jobClusterName = extensionProperties.getProperty(ExtensionProperties.CLUSTER_NAME.getName());
+ Cluster jobCluster = ClusterHelper.getCluster(jobClusterName);
+ if (jobCluster == null) {
+ LOG.error("Cluster entity {} not found", ExtensionProperties.CLUSTER_NAME.getName());
+ throw new FalconException("Cluster entity "
+ + ExtensionProperties.CLUSTER_NAME.getName() + " not found");
+ }
+ return additionalProperties;
+ }
+
+ public static void validateRetentionPolicy(String retentionPolicy) throws FalconException {
+ if (!retentionPolicy.equalsIgnoreCase("delete")) {
+ throw new FalconException("Retention policy \"" + retentionPolicy + "\" is invalid");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/extensions/src/main/java/org/apache/falcon/extensions/util/ExtensionProcessBuilderUtils.java
----------------------------------------------------------------------
diff --git a/extensions/src/main/java/org/apache/falcon/extensions/util/ExtensionProcessBuilderUtils.java b/extensions/src/main/java/org/apache/falcon/extensions/util/ExtensionProcessBuilderUtils.java
index 92e9805..9e23894 100644
--- a/extensions/src/main/java/org/apache/falcon/extensions/util/ExtensionProcessBuilderUtils.java
+++ b/extensions/src/main/java/org/apache/falcon/extensions/util/ExtensionProcessBuilderUtils.java
@@ -44,9 +44,9 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.TimeZone;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import java.util.TimeZone;
/**
http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/extensions/src/test/java/org/apache/falcon/extensions/ExtensionTest.java
----------------------------------------------------------------------
diff --git a/extensions/src/test/java/org/apache/falcon/extensions/ExtensionTest.java b/extensions/src/test/java/org/apache/falcon/extensions/ExtensionTest.java
index ffd9336..b14d500 100644
--- a/extensions/src/test/java/org/apache/falcon/extensions/ExtensionTest.java
+++ b/extensions/src/test/java/org/apache/falcon/extensions/ExtensionTest.java
@@ -20,6 +20,7 @@ package org.apache.falcon.extensions;
import junit.framework.Assert;
import org.apache.falcon.FalconException;
+import org.apache.falcon.cluster.util.MiniHdfsClusterUtil;
import org.apache.falcon.entity.ClusterHelper;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.parser.EntityParserFactory;
@@ -34,12 +35,22 @@ import org.apache.falcon.entity.v0.process.PolicyType;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.extensions.mirroring.hdfs.HdfsMirroringExtension;
import org.apache.falcon.extensions.mirroring.hdfs.HdfsMirroringExtensionProperties;
+import org.apache.falcon.extensions.mirroring.hdfsSnapshot.HdfsSnapshotMirrorProperties;
+import org.apache.falcon.extensions.mirroring.hdfsSnapshot.HdfsSnapshotMirroringExtension;
import org.apache.falcon.extensions.store.AbstractTestExtensionStore;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.io.ByteArrayInputStream;
+import java.io.File;
import java.io.InputStream;
+import java.nio.file.Files;
import java.util.List;
import java.util.Properties;
@@ -58,9 +69,18 @@ public class ExtensionTest extends AbstractTestExtensionStore {
private static final String SOURCE_CLUSTER = "primaryCluster";
private static final String TARGETDIR = "/users/target/file1";
private static final String TARGET_CLUSTER = "backupCluster";
+ private static final String NN_URI = "hdfs://localhost:54314";
+ private static final String RETENTION_POLICY = "delete";
+ private static final String RETENTION_AGE = "mins(5)";
+ private static final String RETENTION_NUM = "7";
+ private static final String TARGET_KERBEROS_PRINCIPAL = "nn/backup@REALM";
+
private Extension extension;
+ private MiniDFSCluster miniDFSCluster;
+ private DistributedFileSystem miniDfs;
+ private File baseDir;
- private static Properties getHdfsProperties() {
+ private static Properties getCommonProperties() {
Properties properties = new Properties();
properties.setProperty(ExtensionProperties.JOB_NAME.getName(),
JOB_NAME);
@@ -72,6 +92,11 @@ public class ExtensionTest extends AbstractTestExtensionStore {
VALIDITY_END);
properties.setProperty(ExtensionProperties.FREQUENCY.getName(),
FREQUENCY);
+ return properties;
+ }
+
+ private static Properties getHdfsProperties() {
+ Properties properties = getCommonProperties();
properties.setProperty(HdfsMirroringExtensionProperties.SOURCE_DIR.getName(),
SOURCEDIR);
properties.setProperty(HdfsMirroringExtensionProperties.SOURCE_CLUSTER.getName(),
@@ -84,10 +109,52 @@ public class ExtensionTest extends AbstractTestExtensionStore {
return properties;
}
+ private static Properties getHdfsSnapshotExtensionProperties() {
+ Properties properties = getCommonProperties();
+ properties.setProperty(HdfsSnapshotMirrorProperties.SOURCE_SNAPSHOT_DIR.getName(),
+ SOURCEDIR);
+ properties.setProperty(HdfsSnapshotMirrorProperties.SOURCE_CLUSTER.getName(),
+ SOURCE_CLUSTER);
+ properties.setProperty(HdfsSnapshotMirrorProperties.SOURCE_SNAPSHOT_RETENTION_POLICY.getName(),
+ RETENTION_POLICY);
+ properties.setProperty(HdfsSnapshotMirrorProperties.SOURCE_SNAPSHOT_RETENTION_AGE_LIMIT.getName(),
+ RETENTION_AGE);
+ properties.setProperty(HdfsSnapshotMirrorProperties.SOURCE_SNAPSHOT_RETENTION_NUMBER.getName(),
+ RETENTION_NUM);
+ properties.setProperty(HdfsSnapshotMirrorProperties.SOURCE_NN.getName(),
+ NN_URI);
+
+ properties.setProperty(HdfsSnapshotMirrorProperties.TARGET_SNAPSHOT_DIR.getName(),
+ TARGETDIR);
+ properties.setProperty(HdfsSnapshotMirrorProperties.TARGET_CLUSTER.getName(),
+ TARGET_CLUSTER);
+ properties.setProperty(HdfsSnapshotMirrorProperties.TARGET_SNAPSHOT_RETENTION_POLICY.getName(),
+ RETENTION_POLICY);
+ properties.setProperty(HdfsSnapshotMirrorProperties.TARGET_SNAPSHOT_RETENTION_AGE_LIMIT.getName(),
+ RETENTION_AGE);
+ properties.setProperty(HdfsSnapshotMirrorProperties.TARGET_SNAPSHOT_RETENTION_NUMBER.getName(),
+ RETENTION_NUM);
+ properties.setProperty(HdfsSnapshotMirrorProperties.TARGET_NN.getName(),
+ NN_URI);
+ properties.setProperty(HdfsSnapshotMirrorProperties.DISTCP_MAX_MAPS.getName(),
+ "5");
+ properties.setProperty(HdfsSnapshotMirrorProperties.MAP_BANDWIDTH_IN_MB.getName(),
+ "100");
+ properties.setProperty(HdfsSnapshotMirrorProperties.TDE_ENCRYPTION_ENABLED.getName(),
+ "false");
+
+ return properties;
+ }
+
@BeforeClass
public void init() throws Exception {
extension = new Extension();
+ baseDir = Files.createTempDirectory("test_extensions_hdfs").toFile().getAbsoluteFile();
+ miniDFSCluster = MiniHdfsClusterUtil.initMiniDfs(MiniHdfsClusterUtil.EXTENSION_TEST_PORT, baseDir);
initClusters();
+ miniDfs = miniDFSCluster.getFileSystem();
+ miniDfs.mkdirs(new Path(SOURCEDIR), new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
+ miniDfs.mkdirs(new Path(TARGETDIR), new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
}
private void initClusters() throws Exception {
@@ -157,4 +224,100 @@ public class ExtensionTest extends AbstractTestExtensionStore {
extension.getEntities(new HdfsMirroringExtension().getName(), props);
}
+
+ @Test
+ public void testGetExtensionEntitiesForHdfsSnapshotMirroring() throws Exception {
+ ProcessEntityParser parser = (ProcessEntityParser) EntityParserFactory.getParser(EntityType.PROCESS);
+
+ miniDfs.allowSnapshot(new Path(SOURCEDIR));
+ miniDfs.allowSnapshot(new Path(TARGETDIR));
+
+ List<Entity> entities = extension.getEntities(new HdfsSnapshotMirroringExtension().getName(),
+ getHdfsSnapshotExtensionProperties());
+ if (entities == null || entities.isEmpty()) {
+ Assert.fail("Entities returned cannot be null or empty");
+ }
+
+ Assert.assertEquals(1, entities.size());
+ Entity entity = entities.get(0);
+ Assert.assertEquals(EntityType.PROCESS, entity.getEntityType());
+ parser.parse(new ByteArrayInputStream(entity.toString().getBytes()));
+
+ // Validate
+ Process processEntity = (Process) entity;
+ Assert.assertEquals(JOB_NAME, processEntity.getName());
+ org.apache.falcon.entity.v0.process.Cluster jobCluster = processEntity.getClusters().
+ getClusters().get(0);
+ Assert.assertEquals(JOB_CLUSTER_NAME, jobCluster.getName());
+ Assert.assertEquals(VALIDITY_START, SchemaHelper.formatDateUTC(jobCluster.getValidity().getStart()));
+ Assert.assertEquals(VALIDITY_END, SchemaHelper.formatDateUTC(jobCluster.getValidity().getEnd()));
+
+ Assert.assertEquals(FREQUENCY, processEntity.getFrequency().toString());
+ Assert.assertEquals("UTC", processEntity.getTimezone().getID());
+
+ Assert.assertEquals(EngineType.OOZIE, processEntity.getWorkflow().getEngine());
+ Assert.assertEquals(extensionStorePath + "/hdfs-snapshot-mirroring/libs",
+ processEntity.getWorkflow().getLib());
+ Assert.assertEquals(extensionStorePath
+ + "/hdfs-snapshot-mirroring/resources/runtime/hdfs-snapshot-mirroring-workflow.xml",
+ processEntity.getWorkflow().getPath());
+
+ Properties props = EntityUtil.getEntityProperties(processEntity);
+
+ Assert.assertEquals(SOURCEDIR, props.getProperty("sourceSnapshotDir"));
+ Assert.assertEquals(SOURCE_CLUSTER, props.getProperty("sourceCluster"));
+ Assert.assertEquals(TARGETDIR, props.getProperty("targetSnapshotDir"));
+ Assert.assertEquals(TARGET_CLUSTER, props.getProperty("targetCluster"));
+ Assert.assertEquals(JOB_NAME, props.getProperty("snapshotJobName"));
+ Assert.assertEquals(HdfsSnapshotMirroringExtension.EMPTY_KERBEROS_PRINCIPAL,
+ props.getProperty("sourceNNKerberosPrincipal"));
+ Assert.assertEquals(TARGET_KERBEROS_PRINCIPAL, props.getProperty("targetNNKerberosPrincipal"));
+
+ //retry
+ Assert.assertEquals(3, processEntity.getRetry().getAttempts());
+ Assert.assertEquals(PolicyType.PERIODIC, processEntity.getRetry().getPolicy());
+ Assert.assertEquals("minutes(30)", processEntity.getRetry().getDelay().toString());
+ }
+
+
+ @Test(dependsOnMethods = "testGetExtensionEntitiesForHdfsSnapshotMirroring",
+ expectedExceptions = FalconException.class,
+ expectedExceptionsMessageRegExp = "sourceSnapshotDir /users/source/file1 does not allow snapshots.")
+ public void testHdfsSnapshotMirroringNonSnapshotableDir() throws Exception {
+ miniDfs.disallowSnapshot(new Path(SOURCEDIR));
+
+ List<Entity> entities = extension.getEntities(new HdfsSnapshotMirroringExtension().getName(),
+ getHdfsSnapshotExtensionProperties());
+ if (entities == null || entities.isEmpty()) {
+ Assert.fail("Entities returned cannot be null or empty");
+ }
+ }
+
+ @Test(expectedExceptions = FalconException.class,
+ expectedExceptionsMessageRegExp = "Missing extension property: sourceCluster")
+ public void testGetExtensionEntitiesForHdfsSnapshotMirroringMissingProperties() throws FalconException {
+ Properties props = getHdfsSnapshotExtensionProperties();
+ props.remove(HdfsSnapshotMirrorProperties.SOURCE_CLUSTER.getName());
+ extension.getEntities(new HdfsSnapshotMirroringExtension().getName(), props);
+ }
+
+ @Test(dependsOnMethods = "testHdfsSnapshotMirroringNonSnapshotableDir",
+ expectedExceptions = FalconException.class,
+ expectedExceptionsMessageRegExp = "sourceSnapshotDir /users/source/file1 does not exist.")
+ public void testHdfsSnapshotMirroringNonExistingDir() throws Exception {
+ if (miniDfs.exists(new Path(SOURCEDIR))) {
+ miniDfs.delete(new Path(SOURCEDIR), true);
+ }
+
+ List<Entity> entities = extension.getEntities(new HdfsSnapshotMirroringExtension().getName(),
+ getHdfsSnapshotExtensionProperties());
+ if (entities == null || entities.isEmpty()) {
+ Assert.fail("Entities returned cannot be null or empty");
+ }
+ }
+
+ @AfterClass
+ public void cleanup() throws Exception {
+ MiniHdfsClusterUtil.cleanupDfs(miniDFSCluster, baseDir);
+ }
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java
----------------------------------------------------------------------
diff --git a/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java b/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java
index 3462321..9dbacde 100644
--- a/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java
+++ b/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java
@@ -39,7 +39,11 @@ public class ExtensionStoreTest extends AbstractTestExtensionStore {
"hdfs-mirroring-template.xml", extensionStorePath
+ "/hdfs-mirroring/resources/runtime/hdfs-mirroring-template.xml",
"hdfs-mirroring-workflow.xml", extensionStorePath
- + "/hdfs-mirroring/resources/runtime/hdfs-mirroring-workflow.xml"
+ + "/hdfs-mirroring/resources/runtime/hdfs-mirroring-workflow.xml",
+ "hdfs-snapshot-mirroring-template.xml", extensionStorePath
+ + "/hdfs-mirroring/resources/runtime/hdfs-snapshot-mirroring-template.xml",
+ "hdfs-snapshot-mirroring-workflow.xml", extensionStorePath
+ + "/hdfs-mirroring/resources/runtime/hdfs-snapshot-mirroring-workflow.xml"
);
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/extensions/src/test/resources/backup-cluster-0.1.xml
----------------------------------------------------------------------
diff --git a/extensions/src/test/resources/backup-cluster-0.1.xml b/extensions/src/test/resources/backup-cluster-0.1.xml
index c3ba6b9..27661be 100644
--- a/extensions/src/test/resources/backup-cluster-0.1.xml
+++ b/extensions/src/test/resources/backup-cluster-0.1.xml
@@ -22,7 +22,7 @@
<interfaces>
<interface type="readonly" endpoint="hftp://localhost:50010"
version="0.20.2"/>
- <interface type="write" endpoint="hdfs://localhost:8020"
+ <interface type="write" endpoint="hdfs://localhost:54134"
version="0.20.2"/>
<interface type="execute" endpoint="localhost:8021" version="0.20.2"/>
<interface type="workflow" endpoint="http://localhost:11000/oozie/"
@@ -40,5 +40,6 @@
<property name="field1" value="value1"/>
<property name="field2" value="value2"/>
<property name="hive.metastore.client.socket.timeout" value="20"/>
+ <property name="dfs.namenode.kerberos.principal" value="nn/backup@REALM"/>
</properties>
</cluster>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/extensions/src/test/resources/hdfs-snapshot-mirroring-template.xml
----------------------------------------------------------------------
diff --git a/extensions/src/test/resources/hdfs-snapshot-mirroring-template.xml b/extensions/src/test/resources/hdfs-snapshot-mirroring-template.xml
new file mode 100644
index 0000000..29131da
--- /dev/null
+++ b/extensions/src/test/resources/hdfs-snapshot-mirroring-template.xml
@@ -0,0 +1,45 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+<process name="##jobName##" xmlns="uri:falcon:process:0.1">
+ <clusters>
+ <!-- source -->
+ <cluster name="##jobClusterName##">
+ <validity end="##jobValidityEnd##" start="##jobValidityStart##"/>
+ </cluster>
+ </clusters>
+
+ <tags/>
+
+ <parallel>1</parallel>
+ <!-- Replication needs to run only once to catch up -->
+ <order>LAST_ONLY</order>
+ <frequency>##jobFrequency##</frequency>
+ <timezone>##jobTimezone##</timezone>
+
+ <properties>
+ <property name="oozie.wf.subworkflow.classpath.inheritance" value="true"/>
+ </properties>
+
+ <workflow name="##jobWorkflowName##" engine="##jobWorkflowEngine##"
+ path="##jobWorkflowPath##" lib="##jobWorkflowLibPath##"/>
+ <retry policy="##jobRetryPolicy##" delay="##jobRetryDelay##" attempts="3"/>
+ <notification type="##jobNotificationType##" to="##jobNotificationReceivers##"/>
+ <ACL/>
+</process>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/extensions/src/test/resources/primary-cluster-0.1.xml
----------------------------------------------------------------------
diff --git a/extensions/src/test/resources/primary-cluster-0.1.xml b/extensions/src/test/resources/primary-cluster-0.1.xml
index a9694c2..f42924c 100644
--- a/extensions/src/test/resources/primary-cluster-0.1.xml
+++ b/extensions/src/test/resources/primary-cluster-0.1.xml
@@ -22,7 +22,7 @@
<interfaces>
<interface type="readonly" endpoint="hftp://localhost:50010"
version="0.20.2"/>
- <interface type="write" endpoint="hdfs://localhost:8020"
+ <interface type="write" endpoint="hdfs://localhost:54134"
version="0.20.2"/>
<interface type="execute" endpoint="localhost:8021" version="0.20.2"/>
<interface type="workflow" endpoint="http://localhost:11000/oozie/"
http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index f777dc9..8f4561c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -122,7 +122,7 @@
<activeByDefault>true</activeByDefault>
</activation>
<properties>
- <hadoop.version>2.6.2</hadoop.version>
+ <hadoop.version>2.7.1</hadoop.version>
</properties>
<dependencyManagement>
<dependencies>
@@ -403,6 +403,41 @@
<module>addons/hivedr</module>
</modules>
</profile>
+
+ <profile>
+ <id>hdfs-snapshot-mirroring</id>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-enforcer-plugin</artifactId>
+ <version>1.3.1</version>
+ <executions>
+ <execution>
+ <id>enforce-property</id>
+ <goals>
+ <goal>enforce</goal>
+ </goals>
+ <configuration>
+ <rules>
+ <requireProperty>
+ <property>hadoop.version</property>
+ <regex>^(2.7.*)</regex>
+ <regexMessage>HDFS Snapshot replication only works with hadoop version >= 2.7.0</regexMessage>
+ </requireProperty>
+ </rules>
+ <fail>true</fail>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ <modules>
+ <module>addons/hdfs-snapshot-mirroring</module>
+ </modules>
+ </profile>
+
<profile>
<id>adf</id>
<modules>
@@ -627,6 +662,12 @@
</dependency>
<dependency>
+ <groupId>org.apache.falcon</groupId>
+ <artifactId>falcon-extensions</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId>
<version>2.1</version>
http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/scheduler/src/test/resources/startup.properties
----------------------------------------------------------------------
diff --git a/scheduler/src/test/resources/startup.properties b/scheduler/src/test/resources/startup.properties
index 6216b70..46031e3 100644
--- a/scheduler/src/test/resources/startup.properties
+++ b/scheduler/src/test/resources/startup.properties
@@ -84,7 +84,7 @@ debug.libext.process.paths=${falcon.libext}
##### List of shared libraries for Falcon workflows #####
-*.shared.libs=activemq-all,ant,geronimo-j2ee-management,jms,json-simple,oozie-client,spring-jms,commons-lang3
+*.shared.libs=activemq-all,ant,geronimo-j2ee-management,jms,json-simple,oozie-client,spring-jms,commons-lang3,commons-el
######### Authentication Properties #########
http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/src/conf/startup.properties
----------------------------------------------------------------------
diff --git a/src/conf/startup.properties b/src/conf/startup.properties
index 3601e22..2f8f514 100644
--- a/src/conf/startup.properties
+++ b/src/conf/startup.properties
@@ -102,7 +102,7 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\
*.broker.impl.class=org.apache.activemq.ActiveMQConnectionFactory
##### List of shared libraries for Falcon workflows #####
-*.shared.libs=activemq-all,ant,geronimo-j2ee-management,jms,json-simple,oozie-client,spring-jms,commons-lang3
+*.shared.libs=activemq-all,ant,geronimo-j2ee-management,jms,json-simple,oozie-client,spring-jms,commons-lang3,commons-el
##### Workflow Job Execution Completion listeners #####
*.workflow.execution.listeners=org.apache.falcon.handler.SLAMonitoringHandler
http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/src/main/assemblies/assembly-standalone.xml
----------------------------------------------------------------------
diff --git a/src/main/assemblies/assembly-standalone.xml b/src/main/assemblies/assembly-standalone.xml
index cc1486a..d3111b7 100644
--- a/src/main/assemblies/assembly-standalone.xml
+++ b/src/main/assemblies/assembly-standalone.xml
@@ -172,6 +172,40 @@
</fileSet>
<fileSet>
+ <directory>addons/extensions/hdfs-snapshot-mirroring/src/main/META</directory>
+ <outputDirectory>extensions/hdfs-snapshot-mirroring/META</outputDirectory>
+ <fileMode>0755</fileMode>
+ <directoryMode>0755</directoryMode>
+ </fileSet>
+
+ <fileSet>
+ <directory>./</directory>
+ <outputDirectory>extensions/hdfs-snapshot-mirroring/libs/build</outputDirectory>
+ <excludes>
+ <exclude>*/**</exclude>
+ </excludes>
+ <fileMode>0755</fileMode>
+ <directoryMode>0755</directoryMode>
+ </fileSet>
+
+ <fileSet>
+ <directory>./</directory>
+ <outputDirectory>extensions/hdfs-snapshot-mirroring/libs/runtime</outputDirectory>
+ <excludes>
+ <exclude>*/**</exclude>
+ </excludes>
+ <fileMode>0755</fileMode>
+ <directoryMode>0755</directoryMode>
+ </fileSet>
+
+ <fileSet>
+ <directory>addons/extensions/hdfs-snapshot-mirroring/src/main/resources</directory>
+ <outputDirectory>extensions/hdfs-snapshot-mirroring/resources</outputDirectory>
+ <fileMode>0755</fileMode>
+ <directoryMode>0755</directoryMode>
+ </fileSet>
+
+ <fileSet>
<directory>addons/extensions/hive-mirroring/src/main/META</directory>
<outputDirectory>extensions/hive-mirroring/META</outputDirectory>
<fileMode>0755</fileMode>
@@ -247,6 +281,12 @@
</file>
<file>
+ <source>addons/extensions/hdfs-snapshot-mirroring/README</source>
+ <outputDirectory>extensions/hdfs-snapshot-mirroring</outputDirectory>
+ <fileMode>0755</fileMode>
+ </file>
+
+ <file>
<source>addons/extensions/hive-mirroring/README</source>
<outputDirectory>extensions/hive-mirroring</outputDirectory>
<fileMode>0755</fileMode>
http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/src/main/assemblies/distributed-package.xml
----------------------------------------------------------------------
diff --git a/src/main/assemblies/distributed-package.xml b/src/main/assemblies/distributed-package.xml
index 502018d..eb45c6f 100644
--- a/src/main/assemblies/distributed-package.xml
+++ b/src/main/assemblies/distributed-package.xml
@@ -150,6 +150,40 @@
</fileSet>
<fileSet>
+ <directory>addons/extensions/hdfs-snapshot-mirroring/src/main/META</directory>
+ <outputDirectory>extensions/hdfs-snapshot-mirroring/META</outputDirectory>
+ <fileMode>0755</fileMode>
+ <directoryMode>0755</directoryMode>
+ </fileSet>
+
+ <fileSet>
+ <directory>./</directory>
+ <outputDirectory>extensions/hdfs-snapshot-mirroring/libs/build</outputDirectory>
+ <excludes>
+ <exclude>*/**</exclude>
+ </excludes>
+ <fileMode>0755</fileMode>
+ <directoryMode>0755</directoryMode>
+ </fileSet>
+
+ <fileSet>
+ <directory>./</directory>
+ <outputDirectory>extensions/hdfs-snapshot-mirroring/libs/runtime</outputDirectory>
+ <excludes>
+ <exclude>*/**</exclude>
+ </excludes>
+ <fileMode>0755</fileMode>
+ <directoryMode>0755</directoryMode>
+ </fileSet>
+
+ <fileSet>
+ <directory>addons/extensions/hdfs-snapshot-mirroring/src/main/resources</directory>
+ <outputDirectory>extensions/hdfs-snapshot-mirroring/resources</outputDirectory>
+ <fileMode>0755</fileMode>
+ <directoryMode>0755</directoryMode>
+ </fileSet>
+
+ <fileSet>
<directory>addons/extensions/hive-mirroring/src/main/META</directory>
<outputDirectory>extensions/hive-mirroring/META</outputDirectory>
<fileMode>0755</fileMode>
@@ -250,6 +284,12 @@
</file>
<file>
+ <source>../addons/extensions/hdfs-snapshot-mirroring/README</source>
+ <outputDirectory>extensions/hdfs-snapshot-mirroring</outputDirectory>
+ <fileMode>0755</fileMode>
+ </file>
+
+ <file>
<source>../addons/extensions/hive-mirroring/README</source>
<outputDirectory>extensions/hive-mirroring</outputDirectory>
<fileMode>0755</fileMode>
http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/src/main/assemblies/standalone-package.xml
----------------------------------------------------------------------
diff --git a/src/main/assemblies/standalone-package.xml b/src/main/assemblies/standalone-package.xml
index eac2e11..0b5c69a 100644
--- a/src/main/assemblies/standalone-package.xml
+++ b/src/main/assemblies/standalone-package.xml
@@ -155,6 +155,40 @@
</fileSet>
<fileSet>
+ <directory>addons/extensions/hdfs-snapshot-mirroring/src/main/META</directory>
+ <outputDirectory>extensions/hdfs-snapshot-mirroring/META</outputDirectory>
+ <fileMode>0755</fileMode>
+ <directoryMode>0755</directoryMode>
+ </fileSet>
+
+ <fileSet>
+ <directory>./</directory>
+ <outputDirectory>extensions/hdfs-snapshot-mirroring/libs/build</outputDirectory>
+ <excludes>
+ <exclude>*/**</exclude>
+ </excludes>
+ <fileMode>0755</fileMode>
+ <directoryMode>0755</directoryMode>
+ </fileSet>
+
+ <fileSet>
+ <directory>./</directory>
+ <outputDirectory>extensions/hdfs-snapshot-mirroring/libs/runtime</outputDirectory>
+ <excludes>
+ <exclude>*/**</exclude>
+ </excludes>
+ <fileMode>0755</fileMode>
+ <directoryMode>0755</directoryMode>
+ </fileSet>
+
+ <fileSet>
+ <directory>addons/extensions/hdfs-snapshot-mirroring/src/main/resources</directory>
+ <outputDirectory>extensions/hdfs-snapshot-mirroring/resources</outputDirectory>
+ <fileMode>0755</fileMode>
+ <directoryMode>0755</directoryMode>
+ </fileSet>
+
+ <fileSet>
<directory>addons/extensions/hive-mirroring/src/main/META</directory>
<outputDirectory>extensions/hive-mirroring/META</outputDirectory>
<fileMode>0755</fileMode>
@@ -235,6 +269,12 @@
</file>
<file>
+ <source>../addons/extensions/hdfs-snapshot-mirroring/README</source>
+ <outputDirectory>extensions/hdfs-snapshot-mirroring</outputDirectory>
+ <fileMode>0755</fileMode>
+ </file>
+
+ <file>
<source>../addons/extensions/hive-mirroring/README</source>
<outputDirectory>extensions/hive-mirroring</outputDirectory>
<fileMode>0755</fileMode>
http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/test-util/pom.xml
----------------------------------------------------------------------
diff --git a/test-util/pom.xml b/test-util/pom.xml
index 5b4a8c8..9f60119 100644
--- a/test-util/pom.xml
+++ b/test-util/pom.xml
@@ -45,8 +45,14 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-auth</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<classifier>tests</classifier>
+ <scope>compile</scope>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/test-util/src/main/java/org/apache/falcon/cluster/util/MiniHdfsClusterUtil.java
----------------------------------------------------------------------
diff --git a/test-util/src/main/java/org/apache/falcon/cluster/util/MiniHdfsClusterUtil.java b/test-util/src/main/java/org/apache/falcon/cluster/util/MiniHdfsClusterUtil.java
new file mode 100644
index 0000000..e1aee2e
--- /dev/null
+++ b/test-util/src/main/java/org/apache/falcon/cluster/util/MiniHdfsClusterUtil.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.cluster.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+
+import java.io.File;
+
+/**
+ * Create a local MiniDFS cluster for testing snapshots et al.
+ */
+public final class MiniHdfsClusterUtil {
+
+ private MiniHdfsClusterUtil() {}
+
+ public static final int EXTENSION_TEST_PORT = 54134;
+ public static final int SNAPSHOT_EVICTION_TEST_PORT = 54135;
+ public static final int SNAPSHOT_REPL_TEST_PORT = 54136;
+
+
+ public static MiniDFSCluster initMiniDfs(int port, File baseDir) throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());
+ MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
+ builder.nameNodePort(port);
+ return builder.build();
+ }
+
+ public static void cleanupDfs(MiniDFSCluster miniDFSCluster, File baseDir) throws Exception {
+ miniDFSCluster.shutdown();
+ FileUtil.fullyDelete(baseDir);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/unit/pom.xml
----------------------------------------------------------------------
diff --git a/unit/pom.xml b/unit/pom.xml
index f1ef463..b233acf 100644
--- a/unit/pom.xml
+++ b/unit/pom.xml
@@ -29,19 +29,31 @@
<artifactId>falcon-unit</artifactId>
- <dependencies>
+ <profiles>
+ <profile>
+ <id>hadoop-2</id>
+ <activation>
+ <activeByDefault>true</activeByDefault>
+ </activation>
+ <properties>
+ <hadoop.version>2.7.1</hadoop.version>
+ </properties>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ </dependency>
+ </dependencies>
+ </profile>
+ </profiles>
+ <dependencies>
<dependency>
<groupId>org.apache.falcon</groupId>
<artifactId>falcon-common</artifactId>
</dependency>
<dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- </dependency>
-
- <dependency>
<groupId>org.apache.oozie</groupId>
<artifactId>oozie-core</artifactId>
<exclusions>
http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/unit/src/main/java/org/apache/falcon/unit/LocalFalconRPCClientFactory.java
----------------------------------------------------------------------
diff --git a/unit/src/main/java/org/apache/falcon/unit/LocalFalconRPCClientFactory.java b/unit/src/main/java/org/apache/falcon/unit/LocalFalconRPCClientFactory.java
index 3070689..595f75c 100644
--- a/unit/src/main/java/org/apache/falcon/unit/LocalFalconRPCClientFactory.java
+++ b/unit/src/main/java/org/apache/falcon/unit/LocalFalconRPCClientFactory.java
@@ -17,10 +17,6 @@
*/
package org.apache.falcon.unit;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
@@ -45,6 +41,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest;
@@ -71,6 +69,10 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RpcClientFactory;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+
/**
* A Dummy implementation of RpcClientFactory that does not do RPC.
* This is required as OozieClient tries to connect to RM via RPC to kill jobs which fails in local mode.
@@ -179,6 +181,12 @@ public final class LocalFalconRPCClientFactory implements RpcClientFactory {
}
@Override
+ public GetLabelsToNodesResponse getLabelsToNodes(GetLabelsToNodesRequest getLabelsToNodesRequest)
+ throws YarnException, IOException {
+ return null;
+ }
+
+ @Override
public GetClusterNodeLabelsResponse getClusterNodeLabels(GetClusterNodeLabelsRequest
getClusterNodeLabelsRequest) throws YarnException, IOException {
return null;
http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/unit/src/main/resources/startup.properties
----------------------------------------------------------------------
diff --git a/unit/src/main/resources/startup.properties b/unit/src/main/resources/startup.properties
index 4dfea31..0e404cc 100644
--- a/unit/src/main/resources/startup.properties
+++ b/unit/src/main/resources/startup.properties
@@ -79,7 +79,7 @@ debug.libext.process.paths=${falcon.libext}
##### List of shared libraries for Falcon workflows #####
-*.shared.libs=activemq-all,ant,geronimo-j2ee-management,jms,json-simple,oozie-client,spring-jms,commons-lang3
+*.shared.libs=activemq-all,ant,geronimo-j2ee-management,jms,json-simple,oozie-client,spring-jms,commons-lang3,commons-el
######### Authentication Properties #########
http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/webapp/pom.xml
----------------------------------------------------------------------
diff --git a/webapp/pom.xml b/webapp/pom.xml
index dad0581..3582be1 100644
--- a/webapp/pom.xml
+++ b/webapp/pom.xml
@@ -88,6 +88,16 @@
</dependencies>
</profile>
<profile>
+ <id>hdfs-snapshot-mirroring</id>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.falcon</groupId>
+ <artifactId>falcon-hdfs-snapshot-mirroring</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+ </profile>
+ <profile>
<id>adf</id>
<dependencies>
<dependency>
http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/webapp/src/test/resources/startup.properties
----------------------------------------------------------------------
diff --git a/webapp/src/test/resources/startup.properties b/webapp/src/test/resources/startup.properties
index 3544f0a..58018f1 100644
--- a/webapp/src/test/resources/startup.properties
+++ b/webapp/src/test/resources/startup.properties
@@ -87,7 +87,7 @@ debug.libext.process.paths=${falcon.libext}
##### List of shared libraries for Falcon workflows #####
-*.shared.libs=activemq-all,ant,geronimo-j2ee-management,jms,json-simple,oozie-client,spring-jms,commons-lang3
+*.shared.libs=activemq-all,ant,geronimo-j2ee-management,jms,json-simple,oozie-client,spring-jms,commons-lang3,commons-el
######### Authentication Properties #########
[2/2] falcon git commit: FALCON-1861 Support HDFS Snapshot based
replication in Falcon
Posted by ba...@apache.org.
FALCON-1861 Support HDFS Snapshot based replication in Falcon
Documentation will be added in Jira FALCON-1908
Author: bvellanki <bv...@hortonworks.com>
Reviewers: "Sowmya <sr...@hortonworks.com>, sandeepSamudrala <sa...@gmail.com>, Ying Zheng <yz...@hortonworks.com>, Venkat Ranganathan <ve...@hortonworks.com>"
Closes #105 from bvellanki/master
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/aba79aae
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/aba79aae
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/aba79aae
Branch: refs/heads/master
Commit: aba79aae2c7235f065413fe0c9f0af5077079371
Parents: 7c0481e
Author: bvellanki <bv...@hortonworks.com>
Authored: Thu Apr 21 16:48:16 2016 -0700
Committer: bvellanki <bv...@hortonworks.com>
Committed: Thu Apr 21 16:48:16 2016 -0700
----------------------------------------------------------------------
.../extensions/hdfs-snapshot-mirroring/README | 31 +++
.../extensions/hdfs-snapshot-mirroring/pom.xml | 32 +++
.../hdfs-snapshot-mirroring-properties.json | 173 ++++++++++++
.../hdfs-snapshot-mirroring-template.xml | 45 +++
.../hdfs-snapshot-mirroring-workflow.xml | 172 ++++++++++++
addons/hdfs-snapshot-mirroring/README | 31 +++
addons/hdfs-snapshot-mirroring/pom.xml | 188 +++++++++++++
.../replication/HdfsSnapshotReplicator.java | 277 +++++++++++++++++++
.../retention/HdfsSnapshotEvictor.java | 208 ++++++++++++++
.../falcon/snapshots/util/HdfsSnapshotUtil.java | 67 +++++
.../replication/HdfsSnapshotReplicatorTest.java | 163 +++++++++++
.../retention/HdfsSnapshotEvictorTest.java | 98 +++++++
.../src/test/resources/backup-cluster-0.1.xml | 44 +++
.../src/test/resources/primary-cluster-0.1.xml | 43 +++
.../org/apache/falcon/entity/ClusterHelper.java | 14 +
.../falcon/hadoop/HadoopClientFactory.java | 93 ++++++-
.../apache/falcon/retention/EvictionHelper.java | 7 +-
common/src/main/resources/startup.properties | 2 +-
.../falcon/retention/EvictionHelperTest.java | 35 +++
extensions/pom.xml | 29 ++
.../falcon/extensions/AbstractExtension.java | 3 +
.../HdfsSnapshotMirrorProperties.java | 84 ++++++
.../HdfsSnapshotMirroringExtension.java | 234 ++++++++++++++++
.../util/ExtensionProcessBuilderUtils.java | 2 +-
.../apache/falcon/extensions/ExtensionTest.java | 165 ++++++++++-
.../extensions/store/ExtensionStoreTest.java | 6 +-
.../src/test/resources/backup-cluster-0.1.xml | 3 +-
.../hdfs-snapshot-mirroring-template.xml | 45 +++
.../src/test/resources/primary-cluster-0.1.xml | 2 +-
pom.xml | 43 ++-
scheduler/src/test/resources/startup.properties | 2 +-
src/conf/startup.properties | 2 +-
src/main/assemblies/assembly-standalone.xml | 40 +++
src/main/assemblies/distributed-package.xml | 40 +++
src/main/assemblies/standalone-package.xml | 40 +++
test-util/pom.xml | 6 +
.../cluster/util/MiniHdfsClusterUtil.java | 52 ++++
unit/pom.xml | 24 +-
.../unit/LocalFalconRPCClientFactory.java | 16 +-
unit/src/main/resources/startup.properties | 2 +-
webapp/pom.xml | 10 +
webapp/src/test/resources/startup.properties | 2 +-
42 files changed, 2537 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/addons/extensions/hdfs-snapshot-mirroring/README
----------------------------------------------------------------------
diff --git a/addons/extensions/hdfs-snapshot-mirroring/README b/addons/extensions/hdfs-snapshot-mirroring/README
new file mode 100644
index 0000000..fc33d3a
--- /dev/null
+++ b/addons/extensions/hdfs-snapshot-mirroring/README
@@ -0,0 +1,31 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+HDFS Snapshot Mirroring Extension.
+
+Overview
+This extension implements replication for snapshot-able directories on HDFS from one
+Hadoop cluster to another. This piggybacks on snapshot solution supported in HDFS (HDFS-7535).
+It also performs retention on the snapshots generated in source and target.
+
+Use Case
+* Create snapshots in source directory
+* Copy this directory between HDFS clusters
+* Create snapshot in target directory
+* Handle snapshot retention in source and target directories
+
+Limitations
+If TDE encryption is enabled, this snapshot based replication is not efficient.
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/addons/extensions/hdfs-snapshot-mirroring/pom.xml
----------------------------------------------------------------------
diff --git a/addons/extensions/hdfs-snapshot-mirroring/pom.xml b/addons/extensions/hdfs-snapshot-mirroring/pom.xml
new file mode 100644
index 0000000..b0b4819
--- /dev/null
+++ b/addons/extensions/hdfs-snapshot-mirroring/pom.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>org.apache.falcon.extensions</groupId>
+ <artifactId>falcon-hdfs-snapshot-mirroring-extension</artifactId>
+ <version>0.10-SNAPSHOT</version>
+ <description>Apache Falcon HDFS Snapshot Mirroring Extension</description>
+ <name>Apache Falcon Sample HDFS Snapshot Mirroring Extension</name>
+ <packaging>jar</packaging>
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/addons/extensions/hdfs-snapshot-mirroring/src/main/META/hdfs-snapshot-mirroring-properties.json
----------------------------------------------------------------------
diff --git a/addons/extensions/hdfs-snapshot-mirroring/src/main/META/hdfs-snapshot-mirroring-properties.json b/addons/extensions/hdfs-snapshot-mirroring/src/main/META/hdfs-snapshot-mirroring-properties.json
new file mode 100644
index 0000000..46554c1
--- /dev/null
+++ b/addons/extensions/hdfs-snapshot-mirroring/src/main/META/hdfs-snapshot-mirroring-properties.json
@@ -0,0 +1,173 @@
+{
+ "shortDescription": "This extension implements replicating snapshotable directories on HDFS from one Hadoop cluster to another.",
+ "properties":[
+ {
+ "propertyName":"jobName",
+ "required":true,
+ "description":"Unique hdfs snapshot mirroring job name",
+ "example":"hdfs-snapshot-daily-mirror"
+ },
+ {
+ "propertyName":"jobClusterName",
+ "required":true,
+ "description":"Cluster where job should run",
+ "example":"backupCluster"
+ },
+ {
+ "propertyName":"jobValidityStart",
+ "required":true,
+ "description":"Job cluster validity start time",
+ "example":"2016-03-03T00:00Z"
+ },
+ {
+ "propertyName":"jobValidityEnd",
+ "required":true,
+ "description":"Job cluster validity end time",
+ "example":"2018-03-13T00:00Z"
+ },
+ {
+ "propertyName":"jobFrequency",
+ "required":true,
+ "description":"Frequency of mirroring job. Valid frequency types are minutes(int), hours(int), days(int), months(int)",
+ "example":"months(1)"
+ },
+ {
+ "propertyName":"jobTimezone",
+ "required":false,
+ "description":"Time zone for the job",
+ "example":"UTC"
+ },
+ {
+ "propertyName":"jobTags",
+ "required":false,
+ "description":"List of comma separated tags. Key Value pairs, separated by comma",
+ "example":"consumer=consumer@xyz.com, owner=producer@xyz.com, _department_type=forecasting"
+ },
+ {
+ "propertyName":"jobRetryPolicy",
+ "required":false,
+ "description":"Job retry policy",
+ "example":"periodic"
+ },
+ {
+ "propertyName":"jobRetryDelay",
+ "required":false,
+ "description":"Job retry delay",
+ "example":"minutes(30)"
+ },
+ {
+ "propertyName":"jobRetryAttempts",
+ "required":false,
+ "description":"Job retry attempts",
+ "example":"3"
+ },
+ {
+ "propertyName":"jobAclOwner",
+ "required":false,
+ "description":"ACL owner",
+ "example":"ambari-qa"
+ },
+ {
+ "propertyName":"jobAclGroup",
+ "required":false,
+ "description":"ACL group",
+ "example":"users"
+ },
+ {
+ "propertyName":"jobAclPermission",
+ "required":false,
+ "description":"ACL permission",
+ "example":"*"
+ },
+ {
+ "propertyName":"sourceCluster",
+ "required":true,
+ "description":"Source cluster for hdfs snapshot replication",
+ "example":"primaryCluster"
+ },
+ {
+ "propertyName":"sourceSnapshotDir",
+ "required":true,
+ "description":"Snapshot-able source directory which should be replicated",
+ "example":"/user/ambari-qa/snapshot/test/primaryCluster/input"
+ },
+ {
+ "propertyName":"sourceSnapshotRetentionPolicy",
+ "required":false,
+ "description":"Retention policy for snapshots created on source. Default is delete (Right now,only delete is supported)",
+ "example":"delete"
+ },
+ {
+ "propertyName":"sourceSnapshotRetentionAgeLimit",
+ "required":true,
+ "description":"Snapshots on source older than this age limit will be eligible for deletion.",
+ "example":"days(7)"
+ },
+ {
+ "propertyName":"sourceSnapshotRetentionNumber",
+ "required":true,
+ "description":"These many latest snapshots on source will be retained, the rest of them eligible for deletion.",
+ "example":"10"
+ },
+ {
+ "propertyName":"targetCluster",
+ "required":true,
+ "description":"Target cluster for hdfs snapshot replication",
+ "example":"backupCluster"
+ },
+ {
+ "propertyName":"targetSnapshotDir",
+ "required":true,
+ "description":"Snapshot-able target directory to which source should be replicated",
+ "example":"/user/ambari-qa/snapshot/test/backupCluster/replica/"
+ },
+ {
+ "propertyName":"targetSnapshotRetentionPolicy",
+ "required":false,
+ "description":"Retention policy for snapshots created on target. Default is delete (Right now,only delete is supported)",
+ "example":"delete"
+ },
+ {
+ "propertyName":"targetSnapshotRetentionAgeLimit",
+ "required":true,
+ "description":"Snapshots on target older than this age limit will be eligible for deletion.",
+ "example":"days(7)"
+ },
+ {
+ "propertyName":"targetSnapshotRetentionNumber",
+ "required":true,
+ "description":"These many latest snapshots on target will be retained, the rest of them eligible for deletion.",
+ "example":"10"
+ },
+ {
+ "propertyName":"distcpMaxMaps",
+ "required":false,
+ "description":"Maximum number of mappers for DistCP",
+ "example":"1"
+ },
+ {
+ "propertyName":"distcpMapBandwidth",
+ "required":false,
+ "description":"Bandwidth in MB for each mapper in DistCP",
+ "example":"100"
+ },
+ {
+ "propertyName":"tdeEncryptionEnabled",
+ "required":false,
+ "description":"Specify if TDE based encryption is enabled on source and target dirs",
+ "example":"false"
+ },
+ {
+ "propertyName":"jobNotificationType",
+ "required":false,
+ "description":"Email Notification for Falcon instance completion",
+ "example":"email"
+ },
+ {
+ "propertyName":"jobNotificationReceivers",
+ "required":false,
+ "description":"Comma separated email Id's",
+ "example":"user1@gmail.com, user2@gmail.com"
+ }
+ ]
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/addons/extensions/hdfs-snapshot-mirroring/src/main/resources/runtime/hdfs-snapshot-mirroring-template.xml
----------------------------------------------------------------------
diff --git a/addons/extensions/hdfs-snapshot-mirroring/src/main/resources/runtime/hdfs-snapshot-mirroring-template.xml b/addons/extensions/hdfs-snapshot-mirroring/src/main/resources/runtime/hdfs-snapshot-mirroring-template.xml
new file mode 100644
index 0000000..29131da
--- /dev/null
+++ b/addons/extensions/hdfs-snapshot-mirroring/src/main/resources/runtime/hdfs-snapshot-mirroring-template.xml
@@ -0,0 +1,45 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+<process name="##jobName##" xmlns="uri:falcon:process:0.1">
+ <clusters>
+ <!-- source -->
+ <cluster name="##jobClusterName##">
+ <validity end="##jobValidityEnd##" start="##jobValidityStart##"/>
+ </cluster>
+ </clusters>
+
+ <tags/>
+
+ <parallel>1</parallel>
+ <!-- Replication needs to run only once to catch up -->
+ <order>LAST_ONLY</order>
+ <frequency>##jobFrequency##</frequency>
+ <timezone>##jobTimezone##</timezone>
+
+ <properties>
+ <property name="oozie.wf.subworkflow.classpath.inheritance" value="true"/>
+ </properties>
+
+ <workflow name="##jobWorkflowName##" engine="##jobWorkflowEngine##"
+ path="##jobWorkflowPath##" lib="##jobWorkflowLibPath##"/>
+ <retry policy="##jobRetryPolicy##" delay="##jobRetryDelay##" attempts="3"/>
+ <notification type="##jobNotificationType##" to="##jobNotificationReceivers##"/>
+ <ACL/>
+</process>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/addons/extensions/hdfs-snapshot-mirroring/src/main/resources/runtime/hdfs-snapshot-mirroring-workflow.xml
----------------------------------------------------------------------
diff --git a/addons/extensions/hdfs-snapshot-mirroring/src/main/resources/runtime/hdfs-snapshot-mirroring-workflow.xml b/addons/extensions/hdfs-snapshot-mirroring/src/main/resources/runtime/hdfs-snapshot-mirroring-workflow.xml
new file mode 100644
index 0000000..c735167
--- /dev/null
+++ b/addons/extensions/hdfs-snapshot-mirroring/src/main/resources/runtime/hdfs-snapshot-mirroring-workflow.xml
@@ -0,0 +1,172 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+<workflow-app xmlns='uri:oozie:workflow:0.3' name='falcon-hdfs-snapshot-mirroring'>
+ <start to='snapshot-replication'/>
+ <!-- Snapshot replication action -->
+ <action name="snapshot-replication">
+ <java>
+ <job-tracker>${jobTracker}</job-tracker>
+ <name-node>${nameNode}</name-node>
+ <configuration>
+ <property> <!-- hadoop 2 parameter -->
+ <name>oozie.launcher.mapreduce.job.user.classpath.first</name>
+ <value>true</value>
+ </property>
+ <property>
+ <name>mapred.job.queue.name</name>
+ <value>${queueName}</value>
+ </property>
+ <property>
+ <name>oozie.launcher.mapred.job.priority</name>
+ <value>${jobPriority}</value>
+ </property>
+ <property>
+ <name>oozie.use.system.libpath</name>
+ <value>true</value>
+ </property>
+ <property>
+ <name>oozie.launcher.oozie.libpath</name>
+ <value>${wf:conf("falcon.libpath")}</value>
+ </property>
+ <property>
+ <name>oozie.action.sharelib.for.java</name>
+ <value>distcp</value>
+ </property>
+ <property>
+ <name>oozie.launcher.mapreduce.job.hdfs-servers</name>
+ <value>${sourceNN},${targetNN}</value>
+ </property>
+ <property>
+ <name>mapreduce.job.hdfs-servers</name>
+ <value>${sourceNN},${targetNN}</value>
+ </property>
+ </configuration>
+ <main-class>org.apache.falcon.snapshots.replication.HdfsSnapshotReplicator</main-class>
+ <arg>-Dmapred.job.queue.name=${queueName}</arg>
+ <arg>-Dmapred.job.priority=${jobPriority}</arg>
+ <arg>-distcpMaxMaps</arg>
+ <arg>${distcpMaxMaps}</arg>
+ <arg>-distcpMapBandwidth</arg>
+ <arg>${distcpMapBandwidth}</arg>
+ <arg>-sourceNN</arg>
+ <arg>${sourceNN}</arg>
+ <arg>-sourceExecUrl</arg>
+ <arg>${sourceExecUrl}</arg>
+ <arg>-sourceNNKerberosPrincipal</arg>
+ <arg>${sourceNNKerberosPrincipal}</arg>
+ <arg>-sourceSnapshotDir</arg>
+ <arg>${sourceSnapshotDir}</arg>
+ <arg>-targetNN</arg>
+ <arg>${targetNN}</arg>
+ <arg>-targetExecUrl</arg>
+ <arg>${targetExecUrl}</arg>
+ <arg>-targetNNKerberosPrincipal</arg>
+ <arg>${targetNNKerberosPrincipal}</arg>
+ <arg>-targetSnapshotDir</arg>
+ <arg>${targetSnapshotDir}</arg>
+ <arg>-tdeEncryptionEnabled</arg>
+ <arg>${tdeEncryptionEnabled}</arg>
+ <arg>-snapshotJobName</arg>
+ <arg>${snapshotJobName}-${nominalTime}</arg>
+ </java>
+ <ok to="snapshot-retention"/>
+ <error to="fail"/>
+ </action>
+ <!-- Snapshot retention action -->
+ <action name="snapshot-retention">
+ <java>
+ <job-tracker>${jobTracker}</job-tracker>
+ <name-node>${nameNode}</name-node>
+ <configuration>
+ <property> <!-- hadoop 2 parameter -->
+ <name>oozie.launcher.mapreduce.job.user.classpath.first</name>
+ <value>true</value>
+ </property>
+ <property>
+ <name>mapred.job.queue.name</name>
+ <value>${queueName}</value>
+ </property>
+ <property>
+ <name>oozie.launcher.mapred.job.priority</name>
+ <value>${jobPriority}</value>
+ </property>
+ <property>
+ <name>oozie.use.system.libpath</name>
+ <value>true</value>
+ </property>
+ <property>
+ <name>oozie.action.sharelib.for.java</name>
+ <value>distcp</value>
+ </property>
+ <property>
+ <name>oozie.launcher.oozie.libpath</name>
+ <value>${wf:conf("falcon.libpath")}</value>
+ </property>
+ <property>
+ <name>oozie.launcher.mapreduce.job.hdfs-servers</name>
+ <value>${sourceNN},${targetNN}</value>
+ </property>
+ <property>
+ <name>mapreduce.job.hdfs-servers</name>
+ <value>${sourceNN},${targetNN}</value>
+ </property>
+ </configuration>
+ <main-class>org.apache.falcon.snapshots.retention.HdfsSnapshotEvictor</main-class>
+ <arg>-Dmapred.job.queue.name=${queueName}</arg>
+ <arg>-Dmapred.job.priority=${jobPriority}</arg>
+ <arg>-sourceNN</arg>
+ <arg>${sourceNN}</arg>
+ <arg>-sourceExecUrl</arg>
+ <arg>${sourceExecUrl}</arg>
+ <arg>-sourceNNKerberosPrincipal</arg>
+ <arg>${sourceNNKerberosPrincipal}</arg>
+ <arg>-sourceSnapshotDir</arg>
+ <arg>${sourceSnapshotDir}</arg>
+ <arg>-sourceSnapshotRetentionPolicy</arg>
+ <arg>${sourceSnapshotRetentionPolicy}</arg>
+ <arg>-sourceSnapshotRetentionAgeLimit</arg>
+ <arg>${sourceSnapshotRetentionAgeLimit}</arg>
+ <arg>-sourceSnapshotRetentionNumber</arg>
+ <arg>${sourceSnapshotRetentionNumber}</arg>
+ <arg>-targetNN</arg>
+ <arg>${targetNN}</arg>
+ <arg>-targetExecUrl</arg>
+ <arg>${targetExecUrl}</arg>
+ <arg>-targetNNKerberosPrincipal</arg>
+ <arg>${targetNNKerberosPrincipal}</arg>
+ <arg>-targetSnapshotDir</arg>
+ <arg>${targetSnapshotDir}</arg>
+ <arg>-targetSnapshotRetentionPolicy</arg>
+ <arg>${targetSnapshotRetentionPolicy}</arg>
+ <arg>-targetSnapshotRetentionAgeLimit</arg>
+ <arg>${targetSnapshotRetentionAgeLimit}</arg>
+ <arg>-targetSnapshotRetentionNumber</arg>
+ <arg>${targetSnapshotRetentionNumber}</arg>
+ <arg>-snapshotJobName</arg>
+ <arg>${snapshotJobName}-${nominalTime}</arg>
+ </java>
+ <ok to="end"/>
+ <error to="fail"/>
+ </action>
+ <kill name="fail">
+ <message>
+ Workflow action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+ </message>
+ </kill>
+ <end name="end"/>
+</workflow-app>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/addons/hdfs-snapshot-mirroring/README
----------------------------------------------------------------------
diff --git a/addons/hdfs-snapshot-mirroring/README b/addons/hdfs-snapshot-mirroring/README
new file mode 100644
index 0000000..fc33d3a
--- /dev/null
+++ b/addons/hdfs-snapshot-mirroring/README
@@ -0,0 +1,31 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+HDFS Snapshot Mirroring Extension.
+
+Overview
+This extension implements replication for snapshot-able directories on HDFS from one
+Hadoop cluster to another. This piggybacks on snapshot solution supported in HDFS (HDFS-7535).
+It also performs retention on the snapshots generated in source and target.
+
+Use Case
+* Create snapshots in source directory
+* Copy this directory between HDFS clusters
+* Create snapshot in target directory
+* Handle snapshot retention in source and target directories
+
+Limitations
+If TDE encryption is enabled, this snapshot based replication is not efficient.
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/addons/hdfs-snapshot-mirroring/pom.xml
----------------------------------------------------------------------
diff --git a/addons/hdfs-snapshot-mirroring/pom.xml b/addons/hdfs-snapshot-mirroring/pom.xml
new file mode 100644
index 0000000..5240d62
--- /dev/null
+++ b/addons/hdfs-snapshot-mirroring/pom.xml
@@ -0,0 +1,188 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.falcon</groupId>
+ <artifactId>falcon-main</artifactId>
+ <version>0.10-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+ <artifactId>falcon-hdfs-snapshot-mirroring</artifactId>
+ <description>Apache Falcon HDFS Snapshot based Replication Module</description>
+ <name>Apache Falcon HDFS Snapshot Replication</name>
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <!-- dependencies are always listed in sorted order by groupId, artifactId -->
+ <!-- intra-project -->
+
+ <dependency>
+ <groupId>org.testng</groupId>
+ <artifactId>testng</artifactId>
+ </dependency>
+ <!-- inter-project -->
+ <dependency>
+ <groupId>javax.jms</groupId>
+ <artifactId>jms</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>3.8.1</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.falcon</groupId>
+ <artifactId>falcon-test-util</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.falcon</groupId>
+ <artifactId>falcon-extensions</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.falcon</groupId>
+ <artifactId>falcon-hadoop-dependencies</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.falcon</groupId>
+ <artifactId>falcon-metrics</artifactId>
+ </dependency>
+ </dependencies>
+
+ <profiles>
+ <profile>
+ <id>hadoop-2</id>
+ <activation>
+ <activeByDefault>true</activeByDefault>
+ </activation>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <classifier>tests</classifier>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <classifier>tests</classifier>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <scope>compile</scope>
+ <version>${hadoop.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-common</artifactId>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-resourcemanager</artifactId>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-nodemanager</artifactId>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-auth</artifactId>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-distcp</artifactId>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.derby</groupId>
+ <artifactId>derby</artifactId>
+ <version>${derby.version}</version>
+ </dependency>
+ </dependencies>
+ </profile>
+ </profiles>
+
+ <build>
+ <sourceDirectory>${basedir}/src/main/java</sourceDirectory>
+ <testSourceDirectory>${basedir}/src/test/java</testSourceDirectory>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <systemProperties>
+ <property>
+ <name>derby.stream.error.file</name>
+ <value>target/derby.log</value>
+ </property>
+ </systemProperties>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/addons/hdfs-snapshot-mirroring/src/main/java/org/apache/falcon/snapshots/replication/HdfsSnapshotReplicator.java
----------------------------------------------------------------------
diff --git a/addons/hdfs-snapshot-mirroring/src/main/java/org/apache/falcon/snapshots/replication/HdfsSnapshotReplicator.java b/addons/hdfs-snapshot-mirroring/src/main/java/org/apache/falcon/snapshots/replication/HdfsSnapshotReplicator.java
new file mode 100644
index 0000000..2e41cc0
--- /dev/null
+++ b/addons/hdfs-snapshot-mirroring/src/main/java/org/apache/falcon/snapshots/replication/HdfsSnapshotReplicator.java
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.snapshots.replication;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.extensions.mirroring.hdfsSnapshot.HdfsSnapshotMirrorProperties;
+import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.falcon.snapshots.util.HdfsSnapshotUtil;
+import org.apache.falcon.workflow.util.OozieActionConfigurationHelper;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.tools.DistCp;
+import org.apache.hadoop.tools.DistCpOptions;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * HDFS snapshot generator and snapshot based replicator.
+ */
+public class HdfsSnapshotReplicator extends Configured implements Tool {
+ private static final Logger LOG = LoggerFactory.getLogger(HdfsSnapshotReplicator.class);
+ protected CommandLine cmd;
+
+ public static void main(String[] args) throws Exception {
+ Configuration conf = OozieActionConfigurationHelper.createActionConf();
+ int ret = ToolRunner.run(conf, new HdfsSnapshotReplicator(), args);
+ if (ret != 0) {
+ throw new Exception("Unable to perform Snapshot based replication action args: " + Arrays.toString(args));
+ }
+ }
+
+ @Override
+ public int run(String[] args) throws FalconException {
+ cmd = getCommand(args);
+
+ String sourceStorageUrl = cmd.getOptionValue(HdfsSnapshotMirrorProperties.SOURCE_NN.getName());
+ String targetStorageUrl = cmd.getOptionValue(HdfsSnapshotMirrorProperties.TARGET_NN.getName());
+
+ DistributedFileSystem sourceFs = HdfsSnapshotUtil.getSourceFileSystem(cmd);
+ DistributedFileSystem targetFs = HdfsSnapshotUtil.getTargetFileSystem(cmd);
+
+ String currentSnapshotName = HdfsSnapshotUtil.SNAPSHOT_PREFIX
+ + cmd.getOptionValue(HdfsSnapshotMirrorProperties.SNAPSHOT_JOB_NAME.getName())
+ + "-" + System.currentTimeMillis();
+ String sourceDir = cmd.getOptionValue(HdfsSnapshotMirrorProperties.SOURCE_SNAPSHOT_DIR.getName());
+ String targetDir = cmd.getOptionValue(HdfsSnapshotMirrorProperties.TARGET_SNAPSHOT_DIR.getName());
+
+ // Generate snapshot on source.
+ createSnapshotInFileSystem(sourceDir, currentSnapshotName, sourceFs);
+
+ // Find most recently recplicated snapshot. If it exists, distCp using the snapshots.
+ // If not, do regular distcp as this is the first time job is running.
+ invokeCopy(sourceStorageUrl, targetStorageUrl, sourceFs, targetFs,
+ sourceDir, targetDir, currentSnapshotName);
+
+ // Generate snapshot on target if distCp succeeds.
+ createSnapshotInFileSystem(targetDir, currentSnapshotName, targetFs);
+
+ LOG.info("Completed HDFS Snapshot Replication.");
+ return 0;
+ }
+
+ private static void createSnapshotInFileSystem(String dirName, String snapshotName,
+ FileSystem fs) throws FalconException {
+ try {
+ LOG.info("Creating snapshot {} in directory {}", snapshotName, dirName);
+ fs.createSnapshot(new Path(dirName), snapshotName);
+ } catch (IOException e) {
+ LOG.warn("Unable to create snapshot {} in filesystem {}. Exception is {}",
+ snapshotName, fs.getConf().get(HadoopClientFactory.FS_DEFAULT_NAME_KEY), e.getMessage());
+ throw new FalconException("Unable to create snapshot " + snapshotName, e);
+ }
+ }
+
+ protected void invokeCopy(String sourceStorageUrl, String targetStorageUrl,
+ DistributedFileSystem sourceFs, DistributedFileSystem targetFs,
+ String sourceDir, String targetDir,
+ String currentSnapshotName) throws FalconException {
+ try {
+ Configuration jobConf = this.getConf();
+ DistCpOptions options = getDistCpOptions(sourceStorageUrl, targetStorageUrl,
+ sourceFs, targetFs, sourceDir, targetDir, currentSnapshotName);
+ DistCp distCp = new DistCp(jobConf, options);
+ LOG.info("Started Snapshot based DistCp from {} to {} ", getStagingUri(sourceStorageUrl, sourceDir),
+ getStagingUri(targetStorageUrl, targetDir));
+ Job distcpJob = distCp.execute();
+ LOG.info("Distp Hadoop job: {}", distcpJob.getJobID().toString());
+ LOG.info("Completed Snapshot based DistCp");
+
+ } catch (FalconException fe) {
+ throw fe;
+ } catch (Exception e) {
+ throw new FalconException("Unable to replicate HDFS directory using snapshots.", e);
+ }
+ }
+
+ private DistCpOptions getDistCpOptions(String sourceStorageUrl, String targetStorageUrl,
+ DistributedFileSystem sourceFs, DistributedFileSystem targetFs,
+ String sourceDir, String targetDir,
+ String currentSnapshotName) throws FalconException {
+
+ List<Path> sourceUris=new ArrayList<Path>();
+ sourceUris.add(new Path(getStagingUri(sourceStorageUrl, sourceDir)));
+
+ DistCpOptions distcpOptions = new DistCpOptions(sourceUris,
+ new Path(getStagingUri(targetStorageUrl, targetDir)));
+
+ // Settings needed for Snapshot distCp.
+ distcpOptions.setSyncFolder(true);
+ distcpOptions.setDeleteMissing(true);
+
+ // Use snapshot diff if two snapshots exist. Else treat it as simple distCp.
+ // get latest replicated snapshot.
+ String replicatedSnapshotName = findLatestReplicatedSnapshot(sourceFs, targetFs, sourceDir, targetDir);
+ if (StringUtils.isNotBlank(replicatedSnapshotName)) {
+ distcpOptions.setUseDiff(true, replicatedSnapshotName, currentSnapshotName);
+ }
+
+ if (Boolean.valueOf(cmd.getOptionValue(HdfsSnapshotMirrorProperties.TDE_ENCRYPTION_ENABLED.getName()))) {
+ // skipCRCCheck and update enabled
+ distcpOptions.setSkipCRC(true);
+ }
+
+ distcpOptions.setBlocking(true);
+ distcpOptions.setMaxMaps(
+ Integer.parseInt(cmd.getOptionValue(HdfsSnapshotMirrorProperties.DISTCP_MAX_MAPS.getName())));
+ distcpOptions.setMapBandwidth(
+ Integer.parseInt(cmd.getOptionValue(HdfsSnapshotMirrorProperties.MAP_BANDWIDTH_IN_MB.getName())));
+ return distcpOptions;
+ }
+
+ private String findLatestReplicatedSnapshot(DistributedFileSystem sourceFs, DistributedFileSystem targetFs,
+ String sourceDir, String targetDir) throws FalconException {
+ try {
+ FileStatus[] sourceSnapshots = sourceFs.listStatus(new Path(getSnapshotDir(sourceDir)));
+ Set<String> sourceSnapshotNames = new HashSet<String>();
+ for (FileStatus snapshot : sourceSnapshots) {
+ sourceSnapshotNames.add(snapshot.getPath().getName());
+ }
+
+ FileStatus[] targetSnapshots = targetFs.listStatus(new Path(getSnapshotDir(targetDir)));
+ if (targetSnapshots.length > 0) {
+ //sort target snapshots in desc order of creation time.
+ Arrays.sort(targetSnapshots, new Comparator<FileStatus>() {
+ @Override
+ public int compare(FileStatus f1, FileStatus f2) {
+ return Long.compare(f2.getModificationTime(), f1.getModificationTime());
+ }
+ });
+
+ // get most recent snapshot name that exists in source.
+ for (int i = 0; i < targetSnapshots.length; i++) {
+ String name = targetSnapshots[i].getPath().getName();
+ if (sourceSnapshotNames.contains(name)) {
+ return name;
+ }
+ }
+ // If control reaches here,
+ // there are snapshots on target, but none are replicated from source. Return null.
+ } // No target snapshots, return null
+ return null;
+ } catch (IOException e) {
+ LOG.error("Unable to find latest snapshot on targetDir {} {}", targetDir, e.getMessage());
+ throw new FalconException("Unable to find latest snapshot on targetDir " + targetDir, e);
+ }
+ }
+
+ private String getStagingUri(String storageUrl, String dir) {
+ storageUrl = StringUtils.removeEnd(storageUrl, Path.SEPARATOR);
+ return storageUrl + Path.SEPARATOR + dir;
+ }
+
+ private String getSnapshotDir(String dirName) {
+ dirName = StringUtils.removeEnd(dirName, Path.SEPARATOR);
+ return dirName + Path.SEPARATOR + HdfsSnapshotUtil.SNAPSHOT_DIR_PREFIX + Path.SEPARATOR;
+ }
+
+ protected CommandLine getCommand(String[] args) throws FalconException {
+ Options options = new Options();
+
+ Option opt = new Option(HdfsSnapshotMirrorProperties.DISTCP_MAX_MAPS.getName(),
+ true, "max number of maps to use for distcp");
+ opt.setRequired(true);
+ options.addOption(opt);
+ opt = new Option(HdfsSnapshotMirrorProperties.MAP_BANDWIDTH_IN_MB.getName(),
+ true, "Bandwidth in MB/s used by each mapper during replication");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ opt = new Option(HdfsSnapshotMirrorProperties.SOURCE_NN.getName(), true, "Source NN");
+ opt.setRequired(true);
+ options.addOption(opt);
+ opt = new Option(HdfsSnapshotMirrorProperties.SOURCE_EXEC_URL.getName(),
+ true, "Replication instance job Exec Url");
+ opt.setRequired(true);
+ options.addOption(opt);
+ opt = new Option(HdfsSnapshotMirrorProperties.SOURCE_NN_KERBEROS_PRINCIPAL.getName(),
+ true, "Replication instance job NN Kerberos Principal");
+ opt.setRequired(false);
+ options.addOption(opt);
+ opt = new Option(HdfsSnapshotMirrorProperties.SOURCE_SNAPSHOT_DIR.getName(),
+ true, "Source snapshot-able dir to replicate");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ opt = new Option(HdfsSnapshotMirrorProperties.TARGET_NN.getName(), true, "Target NN");
+ opt.setRequired(true);
+ options.addOption(opt);
+ opt = new Option(HdfsSnapshotMirrorProperties.TARGET_EXEC_URL.getName(),
+ true, "Replication instance target Exec Url");
+ opt.setRequired(true);
+ options.addOption(opt);
+ opt = new Option(HdfsSnapshotMirrorProperties.TARGET_NN_KERBEROS_PRINCIPAL.getName(),
+ true, "Replication instance target NN Kerberos Principal");
+ opt.setRequired(false);
+ options.addOption(opt);
+ opt = new Option(HdfsSnapshotMirrorProperties.TARGET_SNAPSHOT_DIR.getName(),
+ true, "Target snapshot-able dir to replicate");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ opt = new Option(HdfsSnapshotMirrorProperties.TDE_ENCRYPTION_ENABLED.getName(),
+ true, "Is TDE encryption enabled on dirs being replicated?");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option(HdfsSnapshotMirrorProperties.SNAPSHOT_JOB_NAME.getName(),
+ true, "Replication instance job name");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ try {
+ return new GnuParser().parse(options, args);
+ } catch (ParseException pe) {
+ LOG.info("Unabel to parse commad line arguments for HdfsSnapshotReplicator " + pe.getMessage());
+ throw new FalconException(pe.getMessage());
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/addons/hdfs-snapshot-mirroring/src/main/java/org/apache/falcon/snapshots/retention/HdfsSnapshotEvictor.java
----------------------------------------------------------------------
diff --git a/addons/hdfs-snapshot-mirroring/src/main/java/org/apache/falcon/snapshots/retention/HdfsSnapshotEvictor.java b/addons/hdfs-snapshot-mirroring/src/main/java/org/apache/falcon/snapshots/retention/HdfsSnapshotEvictor.java
new file mode 100644
index 0000000..22e3377
--- /dev/null
+++ b/addons/hdfs-snapshot-mirroring/src/main/java/org/apache/falcon/snapshots/retention/HdfsSnapshotEvictor.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.snapshots.retention;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.extensions.mirroring.hdfsSnapshot.HdfsSnapshotMirrorProperties;
+import org.apache.falcon.retention.EvictionHelper;
+import org.apache.falcon.snapshots.util.HdfsSnapshotUtil;
+import org.apache.falcon.workflow.util.OozieActionConfigurationHelper;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.servlet.jsp.el.ELException;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+
+/**
+ * HDFS snapshot evictor.
+ */
+public class HdfsSnapshotEvictor extends Configured implements Tool {
+ private static final Logger LOG = LoggerFactory.getLogger(HdfsSnapshotEvictor.class);
+
+ public static void main(String[] args) throws Exception {
+ Configuration conf = OozieActionConfigurationHelper.createActionConf();
+ int ret = ToolRunner.run(conf, new HdfsSnapshotEvictor(), args);
+ if (ret != 0) {
+ throw new Exception("Unable to perform eviction action args: " + Arrays.toString(args));
+ }
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ CommandLine cmd = getCommand(args);
+ DistributedFileSystem sourceFs = HdfsSnapshotUtil.getSourceFileSystem(cmd);
+ DistributedFileSystem targetFs = HdfsSnapshotUtil.getTargetFileSystem(cmd);
+
+ String sourceDir = cmd.getOptionValue(HdfsSnapshotMirrorProperties.SOURCE_SNAPSHOT_DIR.getName());
+ String targetDir = cmd.getOptionValue(HdfsSnapshotMirrorProperties.TARGET_SNAPSHOT_DIR.getName());
+
+ // evict on source
+ String retPolicy = cmd.getOptionValue(HdfsSnapshotMirrorProperties.SOURCE_SNAPSHOT_RETENTION_POLICY.getName());
+ String ageLimit = cmd.getOptionValue(
+ HdfsSnapshotMirrorProperties.SOURCE_SNAPSHOT_RETENTION_AGE_LIMIT.getName());
+ int numSnapshots = Integer.parseInt(
+ cmd.getOptionValue(HdfsSnapshotMirrorProperties.SOURCE_SNAPSHOT_RETENTION_NUMBER.getName()));
+ if (retPolicy.equalsIgnoreCase("delete")) {
+ evictSnapshots(sourceFs, sourceDir, ageLimit, numSnapshots);
+ } else {
+ LOG.warn("Unsupported source retention policy {}", retPolicy);
+ throw new FalconException("Unsupported source retention policy " + retPolicy);
+ }
+
+ // evict on target
+ retPolicy = cmd.getOptionValue(HdfsSnapshotMirrorProperties.TARGET_SNAPSHOT_RETENTION_POLICY.getName());
+ ageLimit = cmd.getOptionValue(
+ HdfsSnapshotMirrorProperties.TARGET_SNAPSHOT_RETENTION_AGE_LIMIT.getName());
+ numSnapshots = Integer.parseInt(
+ cmd.getOptionValue(HdfsSnapshotMirrorProperties.TARGET_SNAPSHOT_RETENTION_NUMBER.getName()));
+ if (retPolicy.equalsIgnoreCase("delete")) {
+ evictSnapshots(targetFs, targetDir, ageLimit, numSnapshots);
+ } else {
+ LOG.warn("Unsupported target retention policy {}", retPolicy);
+ throw new FalconException("Unsupported target retention policy " + retPolicy);
+ }
+
+ LOG.info("Completed HDFS Snapshot Eviction.");
+ return 0;
+ }
+
+ protected static void evictSnapshots(DistributedFileSystem fs, String dirName, String ageLimit,
+ int numSnapshots) throws FalconException {
+ try {
+ LOG.info("Started evicting snapshots on dir {}{} using policy {}, agelimit {}, numSnapshot {}",
+ fs.getUri(), dirName, ageLimit, numSnapshots);
+
+ long evictionTime = System.currentTimeMillis() - EvictionHelper.evalExpressionToMilliSeconds(ageLimit);
+
+ dirName = StringUtils.removeEnd(dirName, Path.SEPARATOR);
+ String snapshotDir = dirName + Path.SEPARATOR + HdfsSnapshotUtil.SNAPSHOT_DIR_PREFIX + Path.SEPARATOR;
+ FileStatus[] snapshots = fs.listStatus(new Path(snapshotDir));
+ if (snapshots.length <= numSnapshots) {
+ // no eviction needed
+ return;
+ }
+
+ // Sort by last modified time, ascending order.
+ Arrays.sort(snapshots, new Comparator<FileStatus>() {
+ @Override
+ public int compare(FileStatus f1, FileStatus f2) {
+ return Long.compare(f1.getModificationTime(), f2.getModificationTime());
+ }
+ });
+
+ for (int i = 0; i < (snapshots.length - numSnapshots); i++) {
+ // delete if older than ageLimit while retaining numSnapshots
+ if (snapshots[i].getModificationTime() < evictionTime) {
+ fs.deleteSnapshot(new Path(dirName), snapshots[i].getPath().getName());
+ }
+ }
+
+ } catch (ELException ele) {
+ LOG.warn("Unable to parse retention age limit {} {}", ageLimit, ele.getMessage());
+ throw new FalconException("Unable to parse retention age limit " + ageLimit, ele);
+ } catch (IOException ioe) {
+ LOG.warn("Unable to evict snapshots from dir {} {}", dirName, ioe);
+ throw new FalconException("Unable to evict snapshots from dir " + dirName, ioe);
+ }
+
+ }
+
+ private CommandLine getCommand(String[] args) throws org.apache.commons.cli.ParseException {
+ Options options = new Options();
+
+ Option opt = new Option(HdfsSnapshotMirrorProperties.SOURCE_NN.getName(), true, "Source Cluster");
+ opt.setRequired(true);
+ options.addOption(opt);
+ opt = new Option(HdfsSnapshotMirrorProperties.SOURCE_EXEC_URL.getName(),
+ true, "Replication instance job Exec Url");
+ opt.setRequired(true);
+ options.addOption(opt);
+ opt = new Option(HdfsSnapshotMirrorProperties.SOURCE_NN_KERBEROS_PRINCIPAL.getName(),
+ true, "Replication instance job NN Kerberos Principal");
+ opt.setRequired(false);
+ options.addOption(opt);
+ opt = new Option(HdfsSnapshotMirrorProperties.SOURCE_SNAPSHOT_DIR.getName(),
+ true, "Source snapshot-able dir to replicate");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ opt = new Option(HdfsSnapshotMirrorProperties.TARGET_NN.getName(), true, "Target Cluster");
+ opt.setRequired(true);
+ options.addOption(opt);
+ opt = new Option(HdfsSnapshotMirrorProperties.TARGET_SNAPSHOT_DIR.getName(),
+ true, "Target snapshot-able dir to replicate");
+ opt.setRequired(true);
+ options.addOption(opt);
+ opt = new Option(HdfsSnapshotMirrorProperties.TARGET_EXEC_URL.getName(),
+ true, "Replication instance target Exec Url");
+ opt.setRequired(true);
+ options.addOption(opt);
+ opt = new Option(HdfsSnapshotMirrorProperties.TARGET_NN_KERBEROS_PRINCIPAL.getName(),
+ true, "Replication instance target NN Kerberos Principal");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option(HdfsSnapshotMirrorProperties.SNAPSHOT_JOB_NAME.getName(), true,
+ "Replication instance job name");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ opt = new Option(HdfsSnapshotMirrorProperties.SOURCE_SNAPSHOT_RETENTION_POLICY.getName(),
+ true, "Source retention policy");
+ opt.setRequired(false);
+ options.addOption(opt);
+ opt = new Option(HdfsSnapshotMirrorProperties.SOURCE_SNAPSHOT_RETENTION_AGE_LIMIT.getName(),
+ true, "Source delete snapshots older than agelimit");
+ opt.setRequired(true);
+ options.addOption(opt);
+ opt = new Option(HdfsSnapshotMirrorProperties.SOURCE_SNAPSHOT_RETENTION_NUMBER.getName(),
+ true, "Source number of snapshots to retain");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ opt = new Option(HdfsSnapshotMirrorProperties.TARGET_SNAPSHOT_RETENTION_POLICY.getName(),
+ true, "Target retention policy");
+ opt.setRequired(false);
+ options.addOption(opt);
+ opt = new Option(HdfsSnapshotMirrorProperties.TARGET_SNAPSHOT_RETENTION_AGE_LIMIT.getName(),
+ true, "Target delete snapshots older than agelimit");
+ opt.setRequired(true);
+ options.addOption(opt);
+ opt = new Option(HdfsSnapshotMirrorProperties.TARGET_SNAPSHOT_RETENTION_NUMBER.getName(),
+ true, "Target number of snapshots to retain");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ return new GnuParser().parse(options, args);
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/addons/hdfs-snapshot-mirroring/src/main/java/org/apache/falcon/snapshots/util/HdfsSnapshotUtil.java
----------------------------------------------------------------------
diff --git a/addons/hdfs-snapshot-mirroring/src/main/java/org/apache/falcon/snapshots/util/HdfsSnapshotUtil.java b/addons/hdfs-snapshot-mirroring/src/main/java/org/apache/falcon/snapshots/util/HdfsSnapshotUtil.java
new file mode 100644
index 0000000..5196791
--- /dev/null
+++ b/addons/hdfs-snapshot-mirroring/src/main/java/org/apache/falcon/snapshots/util/HdfsSnapshotUtil.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.snapshots.util;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.ClusterHelper;
+import org.apache.falcon.extensions.mirroring.hdfsSnapshot.HdfsSnapshotMirrorProperties;
+import org.apache.falcon.extensions.mirroring.hdfsSnapshot.HdfsSnapshotMirroringExtension;
+import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+
+/**
+ * Util class for HDFS snapshot based mirroring.
+ */
+public final class HdfsSnapshotUtil {
+
+ public static final String SNAPSHOT_PREFIX = "falcon-snapshot-";
+ public static final String SNAPSHOT_DIR_PREFIX = ".snapshot";
+
+ private HdfsSnapshotUtil() {}
+
+ public static DistributedFileSystem getSourceFileSystem(CommandLine cmd) throws FalconException {
+ String sourceStorageUrl = cmd.getOptionValue(HdfsSnapshotMirrorProperties.SOURCE_NN.getName());
+ String sourceExecuteEndpoint = cmd.getOptionValue(HdfsSnapshotMirrorProperties.SOURCE_EXEC_URL.getName());
+ String sourcePrincipal = parseKerberosPrincipal(cmd.getOptionValue(
+ HdfsSnapshotMirrorProperties.SOURCE_NN_KERBEROS_PRINCIPAL.getName()));
+ Configuration sourceConf = ClusterHelper.getConfiguration(sourceStorageUrl,
+ sourceExecuteEndpoint, sourcePrincipal);
+ return HadoopClientFactory.get().createDistributedProxiedFileSystem(sourceConf);
+ }
+
+ public static DistributedFileSystem getTargetFileSystem(CommandLine cmd) throws FalconException {
+ String targetStorageUrl = cmd.getOptionValue(HdfsSnapshotMirrorProperties.TARGET_NN.getName());
+ String taregtExecuteEndpoint = cmd.getOptionValue(HdfsSnapshotMirrorProperties.TARGET_EXEC_URL.getName());
+ String targetPrincipal = parseKerberosPrincipal(cmd.getOptionValue(
+ HdfsSnapshotMirrorProperties.TARGET_NN_KERBEROS_PRINCIPAL.getName()));
+
+ Configuration targetConf = ClusterHelper.getConfiguration(targetStorageUrl,
+ taregtExecuteEndpoint, targetPrincipal);
+ return HadoopClientFactory.get().createDistributedProxiedFileSystem(targetConf);
+ }
+
+ public static String parseKerberosPrincipal(String principal) {
+ if (principal.equals(HdfsSnapshotMirroringExtension.EMPTY_KERBEROS_PRINCIPAL)) {
+ return null;
+ }
+ return principal;
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/addons/hdfs-snapshot-mirroring/src/test/java/org/apache/falcon/snapshots/replication/HdfsSnapshotReplicatorTest.java
----------------------------------------------------------------------
diff --git a/addons/hdfs-snapshot-mirroring/src/test/java/org/apache/falcon/snapshots/replication/HdfsSnapshotReplicatorTest.java b/addons/hdfs-snapshot-mirroring/src/test/java/org/apache/falcon/snapshots/replication/HdfsSnapshotReplicatorTest.java
new file mode 100644
index 0000000..7924214
--- /dev/null
+++ b/addons/hdfs-snapshot-mirroring/src/test/java/org/apache/falcon/snapshots/replication/HdfsSnapshotReplicatorTest.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.snapshots.replication;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.cluster.util.MiniHdfsClusterUtil;
+import org.apache.falcon.entity.ClusterHelper;
+import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.extensions.mirroring.hdfsSnapshot.HdfsSnapshotMirrorProperties;
+import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.io.InputStream;
+import java.nio.file.Files;
+
+/**
+ * Hdfs Snapshot replicator unit tests.
+ */
+public class HdfsSnapshotReplicatorTest extends HdfsSnapshotReplicator {
+ private MiniDFSCluster miniDFSCluster;
+ private DistributedFileSystem miniDfs;
+ private File baseDir;
+ private Cluster sourceCluster;
+ private Cluster targetCluster;
+ private String sourceStorageUrl;
+ private String targetStorageUrl;
+ private Path sourceDir = new Path("/apps/falcon/snapshot-replication/sourceDir/");
+ private Path targetDir = new Path("/apps/falcon/snapshot-replication/targetDir/");
+
+ private FsPermission fsPermission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL);
+
+ private String[] args = {"--" + HdfsSnapshotMirrorProperties.DISTCP_MAX_MAPS.getName(), "1",
+ "--" + HdfsSnapshotMirrorProperties.MAP_BANDWIDTH_IN_MB.getName(), "100",
+ "--" + HdfsSnapshotMirrorProperties.SOURCE_NN.getName(), "hdfs://localhost:54136",
+ "--" + HdfsSnapshotMirrorProperties.SOURCE_EXEC_URL.getName(), "localhost:8021",
+ "--" + HdfsSnapshotMirrorProperties.TARGET_EXEC_URL.getName(), "localhost:8021",
+ "--" + HdfsSnapshotMirrorProperties.TARGET_NN.getName(), "hdfs://localhost:54136",
+ "--" + HdfsSnapshotMirrorProperties.SOURCE_SNAPSHOT_DIR.getName(),
+ "/apps/falcon/snapshot-replication/sourceDir/",
+ "--" + HdfsSnapshotMirrorProperties.TARGET_SNAPSHOT_DIR.getName(),
+ "/apps/falcon/snapshot-replication/targetDir/",
+ "--" + HdfsSnapshotMirrorProperties.TDE_ENCRYPTION_ENABLED.getName(), "false",
+ "--" + HdfsSnapshotMirrorProperties.SNAPSHOT_JOB_NAME.getName(), "snapshotJobName", };
+
+ @BeforeClass
+ public void init() throws Exception {
+ baseDir = Files.createTempDirectory("test_snapshot-replication").toFile().getAbsoluteFile();
+ miniDFSCluster = MiniHdfsClusterUtil.initMiniDfs(MiniHdfsClusterUtil.SNAPSHOT_REPL_TEST_PORT, baseDir);
+ miniDfs = miniDFSCluster.getFileSystem();
+
+ sourceCluster = initCluster("/primary-cluster-0.1.xml");
+ targetCluster = initCluster("/backup-cluster-0.1.xml");
+
+ miniDfs.mkdirs(sourceDir, fsPermission);
+ miniDfs.mkdirs(targetDir, fsPermission);
+
+ miniDfs.allowSnapshot(sourceDir);
+ miniDfs.allowSnapshot(targetDir);
+
+ cmd = getCommand(args);
+ Assert.assertEquals(cmd.getOptionValue(HdfsSnapshotMirrorProperties.DISTCP_MAX_MAPS.getName()), "1");
+ Assert.assertEquals(cmd.getOptionValue(HdfsSnapshotMirrorProperties.MAP_BANDWIDTH_IN_MB.getName()), "100");
+
+ }
+
+ private Cluster initCluster(String clusterName) throws Exception {
+ InputStream inputStream = getClass().getResourceAsStream(clusterName);
+ Cluster cluster = (Cluster) EntityType.CLUSTER.getUnmarshaller().unmarshal(inputStream);
+ ConfigurationStore.get().publish(EntityType.CLUSTER, cluster);
+ return cluster;
+ }
+
+ @Test
+ public void replicationTest() throws Exception {
+ Configuration sourceConf = ClusterHelper.getConfiguration(sourceCluster);
+ this.setConf(sourceConf);
+ Configuration targetConf = ClusterHelper.getConfiguration(targetCluster);
+ sourceStorageUrl = ClusterHelper.getStorageUrl(sourceCluster);
+ targetStorageUrl = ClusterHelper.getStorageUrl(targetCluster);
+
+ DistributedFileSystem sourceFs = HadoopClientFactory.get().createDistributedProxiedFileSystem(sourceConf);
+ DistributedFileSystem targetFs = HadoopClientFactory.get().createDistributedProxiedFileSystem(targetConf);
+
+ // create dir1, create snapshot, invoke copy, check file in target, create snapshot on target
+ Path dir1 = new Path(sourceDir, "dir1");
+ miniDfs.mkdir(dir1, fsPermission);
+ miniDfs.createSnapshot(sourceDir, "snapshot1");
+ invokeCopy(sourceStorageUrl, targetStorageUrl, sourceFs, targetFs,
+ sourceDir.toString(), targetDir.toString(), "snapshot1");
+ miniDfs.createSnapshot(targetDir, "snapshot1");
+ Assert.assertTrue(miniDfs.exists(new Path(targetDir, "dir1")));
+
+ // create dir2, create snapshot, invoke copy, check dir in target, create snapshot on target
+ Path dir2 = new Path(sourceDir, "dir2");
+ miniDfs.mkdir(dir2, fsPermission);
+ miniDfs.createSnapshot(sourceDir, "snapshot2");
+ invokeCopy(sourceStorageUrl, targetStorageUrl, sourceFs, targetFs,
+ sourceDir.toString(), targetDir.toString(), "snapshot2");
+ miniDfs.createSnapshot(targetDir, "snapshot2");
+ Assert.assertTrue(miniDfs.exists(new Path(targetDir, "dir1")));
+ Assert.assertTrue(miniDfs.exists(new Path(targetDir, "dir2")));
+
+ // delete dir1, create snapshot, invoke copy, check file not in target
+ miniDfs.delete(dir1, true);
+ miniDfs.createSnapshot(sourceDir, "snapshot3");
+ invokeCopy(sourceStorageUrl, targetStorageUrl, sourceFs, targetFs,
+ sourceDir.toString(), targetDir.toString(), "snapshot3");
+ miniDfs.createSnapshot(targetDir, "snapshot3");
+ Assert.assertFalse(miniDfs.exists(new Path(targetDir, "dir1")));
+ Assert.assertTrue(miniDfs.exists(new Path(targetDir, "dir2")));
+ }
+
+ @Test(dependsOnMethods = "replicationTest",
+ expectedExceptions = FalconException.class,
+ expectedExceptionsMessageRegExp = "Unable to find latest snapshot on targetDir "
+ + "/apps/falcon/snapshot-replication/targetDir")
+ public void removeSnapshotabilityOnTargetTest() throws Exception {
+ // remove snapshotability on target, create snapshot on source, invoke copy should fail
+ miniDfs.deleteSnapshot(targetDir, "snapshot1");
+ miniDfs.deleteSnapshot(targetDir, "snapshot2");
+ miniDfs.deleteSnapshot(targetDir, "snapshot3");
+
+ miniDfs.disallowSnapshot(targetDir);
+ Path dir1 = new Path(sourceDir, "dir4");
+ miniDfs.mkdir(dir1, fsPermission);
+ miniDfs.createSnapshot(sourceDir, "snapshot4");
+ invokeCopy(sourceStorageUrl, targetStorageUrl, miniDfs, miniDfs,
+ sourceDir.toString(), targetDir.toString(), "snapshot4");
+ }
+
+ @AfterClass
+ public void cleanup() throws Exception {
+ MiniHdfsClusterUtil.cleanupDfs(miniDFSCluster, baseDir);
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/addons/hdfs-snapshot-mirroring/src/test/java/org/apache/falcon/snapshots/retention/HdfsSnapshotEvictorTest.java
----------------------------------------------------------------------
diff --git a/addons/hdfs-snapshot-mirroring/src/test/java/org/apache/falcon/snapshots/retention/HdfsSnapshotEvictorTest.java b/addons/hdfs-snapshot-mirroring/src/test/java/org/apache/falcon/snapshots/retention/HdfsSnapshotEvictorTest.java
new file mode 100644
index 0000000..a73c399
--- /dev/null
+++ b/addons/hdfs-snapshot-mirroring/src/test/java/org/apache/falcon/snapshots/retention/HdfsSnapshotEvictorTest.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.snapshots.retention;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.cluster.util.MiniHdfsClusterUtil;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.nio.file.Files;
+
+/**
+ * Hdfs snapshot evictor unit tests.
+ */
+public class HdfsSnapshotEvictorTest extends HdfsSnapshotEvictor {
+
+ private static final int NUM_FILES = 7;
+ private MiniDFSCluster miniDFSCluster;
+ private DistributedFileSystem miniDfs;
+ private File baseDir;
+ private Path evictionDir = new Path("/apps/falcon/snapshot-eviction/");
+ private FsPermission fsPermission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL);
+
+ @BeforeClass
+ public void init() throws Exception {
+ baseDir = Files.createTempDirectory("test_snapshot-eviction_hdfs").toFile().getAbsoluteFile();
+ miniDFSCluster = MiniHdfsClusterUtil.initMiniDfs(MiniHdfsClusterUtil.SNAPSHOT_EVICTION_TEST_PORT, baseDir);
+ miniDfs = miniDFSCluster.getFileSystem();
+ miniDfs.mkdirs(evictionDir, fsPermission);
+ miniDfs.allowSnapshot(evictionDir);
+ createSnapshotsForEviction();
+ }
+
+ private void createSnapshotsForEviction() throws Exception {
+ for (int i = 0; i < NUM_FILES; i++) {
+ miniDfs.createSnapshot(evictionDir, String.valueOf(i));
+ Thread.sleep(10000);
+ }
+ }
+
+ @Test
+ public void evictionTest() throws Exception {
+ Path snapshotDir = new Path(evictionDir, ".snapshot");
+ FileStatus[] fileStatuses = miniDfs.listStatus(snapshotDir);
+ Assert.assertEquals(fileStatuses.length, NUM_FILES);
+
+ evictSnapshots(miniDfs, evictionDir.toString(), "minutes(1)", NUM_FILES + 1);
+ fileStatuses = miniDfs.listStatus(snapshotDir);
+ Assert.assertEquals(fileStatuses.length, NUM_FILES);
+
+ evictSnapshots(miniDfs, evictionDir.toString(), "minutes(1)", NUM_FILES - 1);
+ fileStatuses = miniDfs.listStatus(snapshotDir);
+ Assert.assertEquals(fileStatuses.length, NUM_FILES - 1);
+
+ evictSnapshots(miniDfs, evictionDir.toString(), "minutes(1)", 2);
+ fileStatuses = miniDfs.listStatus(snapshotDir);
+ Assert.assertTrue(fileStatuses.length >= 5);
+ }
+
+ @Test(expectedExceptions = FalconException.class,
+ expectedExceptionsMessageRegExp = "Unable to evict snapshots from dir /apps/falcon/non-snapshot-eviction")
+ public void evictionTestCannotSnapshot() throws Exception {
+ Path nonSnapshotDir = new Path("/apps/falcon/non-snapshot-eviction/");
+ miniDfs.mkdirs(nonSnapshotDir, fsPermission);
+ evictSnapshots(miniDfs, nonSnapshotDir.toString(), "minutes(1)", NUM_FILES);
+ }
+
+ @AfterClass
+ public void cleanup() throws Exception {
+ MiniHdfsClusterUtil.cleanupDfs(miniDFSCluster, baseDir);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/addons/hdfs-snapshot-mirroring/src/test/resources/backup-cluster-0.1.xml
----------------------------------------------------------------------
diff --git a/addons/hdfs-snapshot-mirroring/src/test/resources/backup-cluster-0.1.xml b/addons/hdfs-snapshot-mirroring/src/test/resources/backup-cluster-0.1.xml
new file mode 100644
index 0000000..f89237e
--- /dev/null
+++ b/addons/hdfs-snapshot-mirroring/src/test/resources/backup-cluster-0.1.xml
@@ -0,0 +1,44 @@
+<?xml version="1.0"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<cluster colo="gs" description="" name="backupSnapshotRepl" xmlns="uri:falcon:cluster:0.1"
+ >
+ <interfaces>
+ <interface type="readonly" endpoint="hftp://localhost:50010"
+ version="0.20.2"/>
+ <interface type="write" endpoint="hdfs://localhost:54136"
+ version="0.20.2"/>
+ <interface type="execute" endpoint="localhost:8021" version="0.20.2"/>
+ <interface type="workflow" endpoint="http://localhost:11000/oozie/"
+ version="4.0"/>
+ <interface type="messaging" endpoint="tcp://localhost:61616?daemon=true"
+ version="5.1.6"/>
+ <interface type="registry" endpoint="Hcat" version="1"/>
+ </interfaces>
+ <locations>
+ <location name="staging" path="/projects/falcon/staging"/>
+ <location name="temp" path="/tmp"/>
+ <location name="working" path="/projects/falcon/working"/>
+ </locations>
+ <properties>
+ <property name="field1" value="value1"/>
+ <property name="field2" value="value2"/>
+ <property name="hive.metastore.client.socket.timeout" value="20"/>
+ </properties>
+</cluster>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/addons/hdfs-snapshot-mirroring/src/test/resources/primary-cluster-0.1.xml
----------------------------------------------------------------------
diff --git a/addons/hdfs-snapshot-mirroring/src/test/resources/primary-cluster-0.1.xml b/addons/hdfs-snapshot-mirroring/src/test/resources/primary-cluster-0.1.xml
new file mode 100644
index 0000000..30c6242
--- /dev/null
+++ b/addons/hdfs-snapshot-mirroring/src/test/resources/primary-cluster-0.1.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<cluster colo="gs" description="" name="primarySnapshotRepl" xmlns="uri:falcon:cluster:0.1">
+ <interfaces>
+ <interface type="readonly" endpoint="hftp://localhost:50010"
+ version="0.20.2"/>
+ <interface type="write" endpoint="hdfs://localhost:54136"
+ version="0.20.2"/>
+ <interface type="execute" endpoint="localhost:8021" version="0.20.2"/>
+ <interface type="workflow" endpoint="http://localhost:11000/oozie/"
+ version="4.0"/>
+ <interface type="messaging" endpoint="tcp://localhost:61616?daemon=true"
+ version="5.1.6"/>
+ <interface type="registry" endpoint="Hcat" version="1"/>
+ </interfaces>
+ <locations>
+ <location name="staging" path="/projects/falcon/staging"/>
+ <location name="temp" path="/tmp"/>
+ <location name="working" path="/projects/falcon/working"/>
+ </locations>
+ <properties>
+ <property name="field1" value="value1"/>
+ <property name="field2" value="value2"/>
+ <property name="hive.metastore.client.socket.timeout" value="20"/>
+ </properties>
+</cluster>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java b/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
index 24ba7d7..9d79742 100644
--- a/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
@@ -18,6 +18,7 @@
package org.apache.falcon.entity;
+import org.apache.commons.lang3.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.EntityType;
@@ -28,6 +29,7 @@ import org.apache.falcon.entity.v0.cluster.Interfacetype;
import org.apache.falcon.entity.v0.cluster.Location;
import org.apache.falcon.entity.v0.cluster.Property;
import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.falcon.security.SecurityUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -71,6 +73,18 @@ public final class ClusterHelper {
return conf;
}
+ public static Configuration getConfiguration(String storageUrl, String executeEndPoint,
+ String kerberosPrincipal) {
+ Configuration conf = new Configuration();
+ conf.set(HadoopClientFactory.FS_DEFAULT_NAME_KEY, storageUrl);
+ conf.set(HadoopClientFactory.MR_JT_ADDRESS_KEY, executeEndPoint);
+ conf.set(HadoopClientFactory.YARN_RM_ADDRESS_KEY, executeEndPoint);
+ if (StringUtils.isNotBlank(kerberosPrincipal)) {
+ conf.set(SecurityUtil.NN_PRINCIPAL, kerberosPrincipal);
+ }
+ return conf;
+ }
+
public static String getOozieUrl(Cluster cluster) {
return getInterface(cluster, Interfacetype.WORKFLOW).getEndpoint();
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java b/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java
index e970439..3d6b16b 100644
--- a/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java
+++ b/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.security.UserGroupInformation;
@@ -132,6 +133,28 @@ public final class HadoopClientFactory {
}
}
+ /**
+ * Return a DistributedFileSystem created with the authenticated proxy user for the specified conf.
+ *
+ * @param conf Configuration with all necessary information to create the FileSystem.
+ * @return DistributedFileSystem created with the provided proxyUser/group.
+ * @throws org.apache.falcon.FalconException
+ * if the filesystem could not be created.
+ */
+ public DistributedFileSystem createDistributedProxiedFileSystem(final Configuration conf) throws FalconException {
+ Validate.notNull(conf, "configuration cannot be null");
+
+ String nameNode = getNameNode(conf);
+ try {
+ return createDistributedFileSystem(CurrentUser.getProxyUGI(), new URI(nameNode), conf);
+ } catch (URISyntaxException e) {
+ throw new FalconException("Exception while getting Distributed FileSystem for: " + nameNode, e);
+ } catch (IOException e) {
+ throw new FalconException("Exception while getting Distributed FileSystem for proxy: "
+ + CurrentUser.getUser(), e);
+ }
+ }
+
private static String getNameNode(Configuration conf) {
return conf.get(FS_DEFAULT_NAME_KEY);
}
@@ -172,19 +195,7 @@ public final class HadoopClientFactory {
@SuppressWarnings("ResultOfMethodCallIgnored")
public FileSystem createFileSystem(UserGroupInformation ugi, final URI uri,
final Configuration conf) throws FalconException {
- Validate.notNull(ugi, "ugi cannot be null");
- Validate.notNull(conf, "configuration cannot be null");
-
- try {
- if (UserGroupInformation.isSecurityEnabled()) {
- ugi.checkTGTAndReloginFromKeytab();
- }
- } catch (IOException ioe) {
- throw new FalconException("Exception while getting FileSystem. Unable to check TGT for user "
- + ugi.getShortUserName(), ioe);
- }
-
- validateNameNode(uri, conf);
+ validateInputs(ugi, uri, conf);
try {
// prevent falcon impersonating falcon, no need to use doas
@@ -201,13 +212,65 @@ public final class HadoopClientFactory {
return FileSystem.get(uri, conf);
}
});
- } catch (InterruptedException ex) {
+ } catch (InterruptedException | IOException ex) {
throw new FalconException("Exception creating FileSystem:" + ex.getMessage(), ex);
- } catch (IOException ex) {
+ }
+ }
+
+ /**
+ * Return a DistributedFileSystem created with the provided user for the specified URI.
+ *
+ * @param ugi user group information
+ * @param uri file system URI.
+ * @param conf Configuration with all necessary information to create the FileSystem.
+ * @return DistributedFileSystem created with the provided user/group.
+ * @throws org.apache.falcon.FalconException
+ * if the filesystem could not be created.
+ */
+ @SuppressWarnings("ResultOfMethodCallIgnored")
+ public DistributedFileSystem createDistributedFileSystem(UserGroupInformation ugi, final URI uri,
+ final Configuration conf) throws FalconException {
+ validateInputs(ugi, uri, conf);
+ FileSystem returnFs;
+ try {
+ // prevent falcon impersonating falcon, no need to use doas
+ final String proxyUserName = ugi.getShortUserName();
+ if (proxyUserName.equals(UserGroupInformation.getLoginUser().getShortUserName())) {
+ LOG.info("Creating Distributed FS for the login user {}, impersonation not required",
+ proxyUserName);
+ returnFs = DistributedFileSystem.get(uri, conf);
+ } else {
+ LOG.info("Creating FS impersonating user {}", proxyUserName);
+ returnFs = ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
+ public FileSystem run() throws Exception {
+ return DistributedFileSystem.get(uri, conf);
+ }
+ });
+ }
+
+ return (DistributedFileSystem) returnFs;
+ } catch (InterruptedException | IOException ex) {
throw new FalconException("Exception creating FileSystem:" + ex.getMessage(), ex);
}
}
+ private void validateInputs(UserGroupInformation ugi, final URI uri,
+ final Configuration conf) throws FalconException {
+ Validate.notNull(ugi, "ugi cannot be null");
+ Validate.notNull(conf, "configuration cannot be null");
+
+ try {
+ if (UserGroupInformation.isSecurityEnabled()) {
+ ugi.checkTGTAndReloginFromKeytab();
+ }
+ } catch (IOException ioe) {
+ throw new FalconException("Exception while getting FileSystem. Unable to check TGT for user "
+ + ugi.getShortUserName(), ioe);
+ }
+
+ validateNameNode(uri, conf);
+ }
+
/**
* This method validates if the execute url is able to reach the MR endpoint.
*