You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ve...@apache.org on 2013/11/07 20:15:35 UTC
[2/3] git commit: FALCON-169 multiple "/" in target for replication
for multi target feed. Contributed by Venkatesh Seetharam
FALCON-169 multiple "/" in target for replication for multi target feed. Contributed by Venkatesh Seetharam
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/b3b42ef9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/b3b42ef9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/b3b42ef9
Branch: refs/heads/FALCON-85
Commit: b3b42ef94c4c1d750331f89b5b111104a9a95548
Parents: 023f9f4
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Thu Nov 7 11:08:23 2013 -0800
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Thu Nov 7 11:08:23 2013 -0800
----------------------------------------------------------------------
.../apache/falcon/entity/FileSystemStorage.java | 7 +-
.../falcon/entity/FileSystemStorageTest.java | 24 +++++--
.../lifecycle/FileSystemFeedReplicationIT.java | 67 +++++++++++++++++-
.../table/complex-replicating-feed.xml | 71 ++++++++++++++++++++
.../resources/table/target-cluster-alpha.xml | 2 +-
.../resources/table/target-cluster-beta.xml | 2 +-
6 files changed, 161 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b3b42ef9/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
index 72d9e07..68370c7 100644
--- a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
+++ b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
@@ -23,6 +23,7 @@ import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.feed.Location;
import org.apache.falcon.entity.v0.feed.LocationType;
import org.apache.falcon.entity.v0.feed.Locations;
+import org.apache.hadoop.fs.Path;
import java.net.URI;
import java.net.URISyntaxException;
@@ -164,10 +165,8 @@ public class FileSystemStorage implements Storage {
return "/tmp";
}
- StringBuilder uriTemplate = new StringBuilder();
- uriTemplate.append(storageUrl);
- uriTemplate.append(locationForType.getPath());
- return uriTemplate.toString();
+ // normalize the path so trailing and double '/' are removed
+ return storageUrl + new Path(locationForType.getPath());
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b3b42ef9/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java b/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
index a059652..6917472 100644
--- a/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
@@ -21,6 +21,7 @@ package org.apache.falcon.entity;
import org.apache.falcon.entity.v0.feed.Location;
import org.apache.falcon.entity.v0.feed.LocationType;
import org.testng.Assert;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import java.util.ArrayList;
@@ -135,16 +136,29 @@ public class FileSystemStorageTest {
Assert.assertEquals(storage.getUriTemplate(LocationType.DATA), "${nameNode}/foo/bar");
}
- @Test
- public void testGetUriTemplateWithLocationType() throws Exception {
+ @DataProvider(name = "locationTestDataProvider")
+ private Object[][] createLocationTestData() {
+ return new Object[][] {
+ {"hdfs://localhost:41020", "/localDC/rc/billing/ua2", "/localDC/rc/billing/ua2"},
+ {"hdfs://localhost:41020", "/localDC/rc/billing/ua2/", "/localDC/rc/billing/ua2"},
+ {"hdfs://localhost:41020", "/localDC/rc/billing/ua2//", "/localDC/rc/billing/ua2"},
+ {"${nameNode}", "/localDC/rc/billing/ua2", "/localDC/rc/billing/ua2"},
+ {"${nameNode}", "/localDC/rc/billing/ua2/", "/localDC/rc/billing/ua2"},
+ {"${nameNode}", "/localDC/rc/billing/ua2//", "/localDC/rc/billing/ua2"},
+ };
+ }
+
+ @Test (dataProvider = "locationTestDataProvider")
+ public void testGetUriTemplateWithLocationType(String storageUrl, String path,
+ String expected) throws Exception {
final Location location = new Location();
- location.setPath("/foo/bar");
+ location.setPath(path);
location.setType(LocationType.DATA);
List<Location> locations = new ArrayList<Location>();
locations.add(location);
- FileSystemStorage storage = new FileSystemStorage("hdfs://localhost:41020", locations);
- Assert.assertEquals(storage.getUriTemplate(LocationType.DATA), "hdfs://localhost:41020/foo/bar");
+ FileSystemStorage storage = new FileSystemStorage(storageUrl, locations);
+ Assert.assertEquals(storage.getUriTemplate(LocationType.DATA), storageUrl + expected);
}
@Test
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b3b42ef9/webapp/src/test/java/org/apache/falcon/lifecycle/FileSystemFeedReplicationIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/lifecycle/FileSystemFeedReplicationIT.java b/webapp/src/test/java/org/apache/falcon/lifecycle/FileSystemFeedReplicationIT.java
index f00bdd7..058b35c 100644
--- a/webapp/src/test/java/org/apache/falcon/lifecycle/FileSystemFeedReplicationIT.java
+++ b/webapp/src/test/java/org/apache/falcon/lifecycle/FileSystemFeedReplicationIT.java
@@ -152,6 +152,10 @@ public class FileSystemFeedReplicationIT {
.accept(MediaType.APPLICATION_JSON)
.get(InstancesResult.class);
Assert.assertEquals(response.getStatus(), APIResult.Status.SUCCEEDED);
+
+ TestContext.executeWithURL("entity -delete -type feed -name " + feedName);
+ TestContext.executeWithURL("entity -delete -type cluster -name primary-cluster");
+ TestContext.executeWithURL("entity -delete -type cluster -name bcp-cluster");
}
@Test (enabled = false)
@@ -192,7 +196,7 @@ public class FileSystemFeedReplicationIT {
FileSystem beta = FileSystem.get(ClusterHelper.getConfiguration(targetBetaContext.getCluster().getCluster()));
Assert.assertTrue(beta.exists(new Path("/falcon/test/target-cluster-beta/customer_beta/" + PARTITION_VALUE)));
- FileSystem gamma = FileSystem.get(ClusterHelper.getConfiguration(targetAlphaContext.getCluster().getCluster()));
+ FileSystem gamma = FileSystem.get(ClusterHelper.getConfiguration(targetGammaContext.getCluster().getCluster()));
Assert.assertTrue(
gamma.exists(new Path("/falcon/test/target-cluster-gamma/customer_gamma/" + PARTITION_VALUE)));
@@ -201,5 +205,66 @@ public class FileSystemFeedReplicationIT {
.accept(MediaType.APPLICATION_JSON)
.get(InstancesResult.class);
Assert.assertEquals(response.getStatus(), APIResult.Status.SUCCEEDED);
+
+ TestContext.executeWithURL("entity -delete -type feed -name " + feedName);
+ TestContext.executeWithURL("entity -delete -type cluster -name primary-cluster");
+ TestContext.executeWithURL("entity -delete -type cluster -name target-cluster-alpha");
+ TestContext.executeWithURL("entity -delete -type cluster -name target-cluster-beta");
+ TestContext.executeWithURL("entity -delete -type cluster -name target-cluster-gamma");
+ }
+
+ @Test (enabled = false)
+ public void testFSReplicationComplex() throws Exception {
+ // copyTestDataToHDFS
+ Cluster sourceCluster = sourceContext.getCluster().getCluster();
+ FileSystem sourceFS = FileSystem.get(ClusterHelper.getConfiguration(sourceCluster));
+ String sourceStorageUrl = ClusterHelper.getStorageUrl(sourceCluster);
+ final String partitionValue = "2012-10-01-12";
+ String sourceLocation = sourceStorageUrl + SOURCE_LOCATION + partitionValue + "/" + sourceCluster.getColo();
+ FSUtils.copyResourceToHDFS("/apps/data/data.txt", "data.txt", sourceLocation);
+ Path sourcePath = new Path(sourceLocation);
+ Assert.assertTrue(sourceFS.exists(sourcePath));
+
+ final Map<String, String> overlay = sourceContext.getUniqueOverlay();
+ String filePath = sourceContext.overlayParametersOverTemplate("/table/primary-cluster.xml", overlay);
+ Assert.assertEquals(0, TestContext.executeWithURL("entity -submit -type cluster -file " + filePath));
+
+ filePath = targetContext.overlayParametersOverTemplate("/table/target-cluster-alpha.xml", overlay);
+ Assert.assertEquals(0, TestContext.executeWithURL("entity -submit -type cluster -file " + filePath));
+
+ filePath = targetContext.overlayParametersOverTemplate("/table/target-cluster-beta.xml", overlay);
+ Assert.assertEquals(0, TestContext.executeWithURL("entity -submit -type cluster -file " + filePath));
+
+ // verify if the partition on the source exists - precondition
+ Assert.assertTrue(sourceFS.exists(sourcePath));
+
+ filePath = sourceContext.overlayParametersOverTemplate("/table/complex-replicating-feed.xml", overlay);
+ Assert.assertEquals(0, TestContext.executeWithURL("entity -submitAndSchedule -type feed -file " + filePath));
+
+ // wait until the workflow job completes
+ final String feedName = "complex-replicating-feed";
+ WorkflowJob jobInfo = OozieTestUtils.getWorkflowJob(targetContext.getCluster().getCluster(),
+ OozieClient.FILTER_NAME + "=FALCON_FEED_REPLICATION_" + feedName);
+ Assert.assertEquals(jobInfo.getStatus(), WorkflowJob.Status.SUCCEEDED);
+
+ Assert.assertTrue(sourceFS.exists(new Path(SOURCE_LOCATION + partitionValue)));
+
+ // verify if the partition on the target exists
+ FileSystem alpha = FileSystem.get(ClusterHelper.getConfiguration(targetAlphaContext.getCluster().getCluster()));
+ Assert.assertTrue(alpha.exists(new Path("/localDC/rc/billing/ua1/" + partitionValue)));
+
+ FileSystem beta = FileSystem.get(ClusterHelper.getConfiguration(targetBetaContext.getCluster().getCluster()));
+ Assert.assertTrue(beta.exists(new Path("/localDC/rc/billing/ua2/" + partitionValue)));
+
+ InstancesResult response = targetContext.getService().path("api/instance/running/feed/" + feedName)
+ .header("Remote-User", "guest")
+ .accept(MediaType.APPLICATION_JSON)
+ .get(InstancesResult.class);
+ Assert.assertEquals(response.getStatus(), APIResult.Status.SUCCEEDED);
+
+ TestContext.executeWithURL("entity -delete -type feed -name " + feedName);
+ TestContext.executeWithURL("entity -delete -type cluster -name primary-cluster");
+ TestContext.executeWithURL("entity -delete -type cluster -name target-cluster-alpha");
+ TestContext.executeWithURL("entity -delete -type cluster -name target-cluster-beta");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b3b42ef9/webapp/src/test/resources/table/complex-replicating-feed.xml
----------------------------------------------------------------------
diff --git a/webapp/src/test/resources/table/complex-replicating-feed.xml b/webapp/src/test/resources/table/complex-replicating-feed.xml
new file mode 100644
index 0000000..35e8986
--- /dev/null
+++ b/webapp/src/test/resources/table/complex-replicating-feed.xml
@@ -0,0 +1,71 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+<!--
+ Replicating Hourly customer table from primary to secondary cluster.
+ -->
+<feed description="billing RC File" name="complex-replicating-feed" xmlns="uri:falcon:feed:0.1">
+ <partitions>
+ <partition name="colo"/>
+ <partition name="eventTime"/>
+ <partition name="impressionHour"/>
+ <partition name="pricingModel"/>
+ </partitions>
+
+ <groups>online,bi</groups>
+
+ <frequency>minutes(5)</frequency>
+ <timezone>UTC</timezone>
+ <late-arrival cut-off="minutes(1)"/>
+
+ <clusters>
+ <cluster partition="${cluster.colo}" name="primary-cluster" type="source">
+ <validity end="2099-01-01T00:00Z" start="2012-10-01T12:00Z"/>
+ <retention action="delete" limit="days(10000)"/>
+
+ <locations>
+ <location type="data"
+ path="/falcon/test/primary-cluster/customer_raw/${YEAR}-${MONTH}-${DAY}-${HOUR}/" />
+ </locations>
+ </cluster>
+ <cluster type="target" name="target-cluster-alpha">
+ <validity end="2012-10-01T12:11Z" start="2012-10-01T12:05Z"/>
+ <retention action="delete" limit="days(10000)"/>
+
+ <locations>
+ <location path="/localDC/rc/billing/ua1/${YEAR}-${MONTH}-${DAY}-${HOUR}/" type="data"/>
+ </locations>
+ </cluster>
+ <cluster type="target" name="target-cluster-beta">
+ <validity end="2012-10-01T12:26Z" start="2012-10-01T12:10Z"/>
+ <retention action="delete" limit="days(10000)"/>
+
+ <locations>
+ <location path="/localDC/rc/billing/ua2/${YEAR}-${MONTH}-${DAY}-${HOUR}/" type="data"/>
+ </locations>
+ </cluster>
+ </clusters>
+
+ <locations>
+ <location type="data" path="/falcon/test/customer_raw/${YEAR}-${MONTH}-${DAY}-${HOUR}/" />
+ </locations>
+
+ <ACL owner="seetharam" group="users" permission="0755"/>
+ <schema location="" provider="hcatalog"/>
+
+</feed>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b3b42ef9/webapp/src/test/resources/table/target-cluster-alpha.xml
----------------------------------------------------------------------
diff --git a/webapp/src/test/resources/table/target-cluster-alpha.xml b/webapp/src/test/resources/table/target-cluster-alpha.xml
index c62303e..528fd41 100644
--- a/webapp/src/test/resources/table/target-cluster-alpha.xml
+++ b/webapp/src/test/resources/table/target-cluster-alpha.xml
@@ -19,7 +19,7 @@
<!--
target-cluster-alpha configuration for demo vm
-->
-<cluster colo="east-coast" description="BCP Cluster"
+<cluster colo="ua1" description="BCP Cluster"
name="target-cluster-alpha"
xmlns="uri:falcon:cluster:0.1" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b3b42ef9/webapp/src/test/resources/table/target-cluster-beta.xml
----------------------------------------------------------------------
diff --git a/webapp/src/test/resources/table/target-cluster-beta.xml b/webapp/src/test/resources/table/target-cluster-beta.xml
index 3738575..d2c808a 100644
--- a/webapp/src/test/resources/table/target-cluster-beta.xml
+++ b/webapp/src/test/resources/table/target-cluster-beta.xml
@@ -19,7 +19,7 @@
<!--
target-cluster-beta configuration for demo vm
-->
-<cluster colo="east-coast" description="BCP Cluster"
+<cluster colo="ua2" description="BCP Cluster"
name="target-cluster-beta"
xmlns="uri:falcon:cluster:0.1" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">