You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ve...@apache.org on 2013/11/07 20:15:35 UTC

[2/3] git commit: FALCON-169 multiple "/" in target for replication for multi target feed. Contributed by Venkatesh Seetharam

FALCON-169 multiple "/" in target for replication for multi target feed. Contributed by Venkatesh Seetharam


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/b3b42ef9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/b3b42ef9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/b3b42ef9

Branch: refs/heads/FALCON-85
Commit: b3b42ef94c4c1d750331f89b5b111104a9a95548
Parents: 023f9f4
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Thu Nov 7 11:08:23 2013 -0800
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Thu Nov 7 11:08:23 2013 -0800

----------------------------------------------------------------------
 .../apache/falcon/entity/FileSystemStorage.java |  7 +-
 .../falcon/entity/FileSystemStorageTest.java    | 24 +++++--
 .../lifecycle/FileSystemFeedReplicationIT.java  | 67 +++++++++++++++++-
 .../table/complex-replicating-feed.xml          | 71 ++++++++++++++++++++
 .../resources/table/target-cluster-alpha.xml    |  2 +-
 .../resources/table/target-cluster-beta.xml     |  2 +-
 6 files changed, 161 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b3b42ef9/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
index 72d9e07..68370c7 100644
--- a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
+++ b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
@@ -23,6 +23,7 @@ import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.feed.Location;
 import org.apache.falcon.entity.v0.feed.LocationType;
 import org.apache.falcon.entity.v0.feed.Locations;
+import org.apache.hadoop.fs.Path;
 
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -164,10 +165,8 @@ public class FileSystemStorage implements Storage {
             return "/tmp";
         }
 
-        StringBuilder uriTemplate = new StringBuilder();
-        uriTemplate.append(storageUrl);
-        uriTemplate.append(locationForType.getPath());
-        return uriTemplate.toString();
+        // normalize the path so trailing and double '/' are removed
+        return storageUrl + new Path(locationForType.getPath());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b3b42ef9/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java b/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
index a059652..6917472 100644
--- a/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
@@ -21,6 +21,7 @@ package org.apache.falcon.entity;
 import org.apache.falcon.entity.v0.feed.Location;
 import org.apache.falcon.entity.v0.feed.LocationType;
 import org.testng.Assert;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 import java.util.ArrayList;
@@ -135,16 +136,29 @@ public class FileSystemStorageTest {
         Assert.assertEquals(storage.getUriTemplate(LocationType.DATA), "${nameNode}/foo/bar");
     }
 
-    @Test
-    public void testGetUriTemplateWithLocationType() throws Exception {
+    @DataProvider(name = "locationTestDataProvider")
+    private Object[][] createLocationTestData() {
+        return new Object[][] {
+            {"hdfs://localhost:41020", "/localDC/rc/billing/ua2", "/localDC/rc/billing/ua2"},
+            {"hdfs://localhost:41020", "/localDC/rc/billing/ua2/", "/localDC/rc/billing/ua2"},
+            {"hdfs://localhost:41020", "/localDC/rc/billing/ua2//", "/localDC/rc/billing/ua2"},
+            {"${nameNode}", "/localDC/rc/billing/ua2", "/localDC/rc/billing/ua2"},
+            {"${nameNode}", "/localDC/rc/billing/ua2/", "/localDC/rc/billing/ua2"},
+            {"${nameNode}", "/localDC/rc/billing/ua2//", "/localDC/rc/billing/ua2"},
+        };
+    }
+
+    @Test (dataProvider = "locationTestDataProvider")
+    public void testGetUriTemplateWithLocationType(String storageUrl, String path,
+                                                   String expected) throws Exception {
         final Location location = new Location();
-        location.setPath("/foo/bar");
+        location.setPath(path);
         location.setType(LocationType.DATA);
         List<Location> locations = new ArrayList<Location>();
         locations.add(location);
 
-        FileSystemStorage storage = new FileSystemStorage("hdfs://localhost:41020", locations);
-        Assert.assertEquals(storage.getUriTemplate(LocationType.DATA), "hdfs://localhost:41020/foo/bar");
+        FileSystemStorage storage = new FileSystemStorage(storageUrl, locations);
+        Assert.assertEquals(storage.getUriTemplate(LocationType.DATA), storageUrl + expected);
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b3b42ef9/webapp/src/test/java/org/apache/falcon/lifecycle/FileSystemFeedReplicationIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/lifecycle/FileSystemFeedReplicationIT.java b/webapp/src/test/java/org/apache/falcon/lifecycle/FileSystemFeedReplicationIT.java
index f00bdd7..058b35c 100644
--- a/webapp/src/test/java/org/apache/falcon/lifecycle/FileSystemFeedReplicationIT.java
+++ b/webapp/src/test/java/org/apache/falcon/lifecycle/FileSystemFeedReplicationIT.java
@@ -152,6 +152,10 @@ public class FileSystemFeedReplicationIT {
                 .accept(MediaType.APPLICATION_JSON)
                 .get(InstancesResult.class);
         Assert.assertEquals(response.getStatus(), APIResult.Status.SUCCEEDED);
+
+        TestContext.executeWithURL("entity -delete -type feed -name " + feedName);
+        TestContext.executeWithURL("entity -delete -type cluster -name primary-cluster");
+        TestContext.executeWithURL("entity -delete -type cluster -name bcp-cluster");
     }
 
     @Test (enabled = false)
@@ -192,7 +196,7 @@ public class FileSystemFeedReplicationIT {
         FileSystem beta = FileSystem.get(ClusterHelper.getConfiguration(targetBetaContext.getCluster().getCluster()));
         Assert.assertTrue(beta.exists(new Path("/falcon/test/target-cluster-beta/customer_beta/" + PARTITION_VALUE)));
 
-        FileSystem gamma = FileSystem.get(ClusterHelper.getConfiguration(targetAlphaContext.getCluster().getCluster()));
+        FileSystem gamma = FileSystem.get(ClusterHelper.getConfiguration(targetGammaContext.getCluster().getCluster()));
         Assert.assertTrue(
                 gamma.exists(new Path("/falcon/test/target-cluster-gamma/customer_gamma/" + PARTITION_VALUE)));
 
@@ -201,5 +205,66 @@ public class FileSystemFeedReplicationIT {
                 .accept(MediaType.APPLICATION_JSON)
                 .get(InstancesResult.class);
         Assert.assertEquals(response.getStatus(), APIResult.Status.SUCCEEDED);
+
+        TestContext.executeWithURL("entity -delete -type feed -name " + feedName);
+        TestContext.executeWithURL("entity -delete -type cluster -name primary-cluster");
+        TestContext.executeWithURL("entity -delete -type cluster -name target-cluster-alpha");
+        TestContext.executeWithURL("entity -delete -type cluster -name target-cluster-beta");
+        TestContext.executeWithURL("entity -delete -type cluster -name target-cluster-gamma");
+    }
+
+    @Test (enabled = false)
+    public void testFSReplicationComplex() throws Exception {
+        // copyTestDataToHDFS
+        Cluster sourceCluster = sourceContext.getCluster().getCluster();
+        FileSystem sourceFS = FileSystem.get(ClusterHelper.getConfiguration(sourceCluster));
+        String sourceStorageUrl = ClusterHelper.getStorageUrl(sourceCluster);
+        final String partitionValue = "2012-10-01-12";
+        String sourceLocation = sourceStorageUrl + SOURCE_LOCATION + partitionValue + "/" + sourceCluster.getColo();
+        FSUtils.copyResourceToHDFS("/apps/data/data.txt", "data.txt", sourceLocation);
+        Path sourcePath = new Path(sourceLocation);
+        Assert.assertTrue(sourceFS.exists(sourcePath));
+
+        final Map<String, String> overlay = sourceContext.getUniqueOverlay();
+        String filePath = sourceContext.overlayParametersOverTemplate("/table/primary-cluster.xml", overlay);
+        Assert.assertEquals(0, TestContext.executeWithURL("entity -submit -type cluster -file " + filePath));
+
+        filePath = targetContext.overlayParametersOverTemplate("/table/target-cluster-alpha.xml", overlay);
+        Assert.assertEquals(0, TestContext.executeWithURL("entity -submit -type cluster -file " + filePath));
+
+        filePath = targetContext.overlayParametersOverTemplate("/table/target-cluster-beta.xml", overlay);
+        Assert.assertEquals(0, TestContext.executeWithURL("entity -submit -type cluster -file " + filePath));
+
+        // verify if the partition on the source exists - precondition
+        Assert.assertTrue(sourceFS.exists(sourcePath));
+
+        filePath = sourceContext.overlayParametersOverTemplate("/table/complex-replicating-feed.xml", overlay);
+        Assert.assertEquals(0, TestContext.executeWithURL("entity -submitAndSchedule -type feed -file " + filePath));
+
+        // wait until the workflow job completes
+        final String feedName = "complex-replicating-feed";
+        WorkflowJob jobInfo = OozieTestUtils.getWorkflowJob(targetContext.getCluster().getCluster(),
+                OozieClient.FILTER_NAME + "=FALCON_FEED_REPLICATION_" + feedName);
+        Assert.assertEquals(jobInfo.getStatus(), WorkflowJob.Status.SUCCEEDED);
+
+        Assert.assertTrue(sourceFS.exists(new Path(SOURCE_LOCATION + partitionValue)));
+
+        // verify if the partition on the target exists
+        FileSystem alpha = FileSystem.get(ClusterHelper.getConfiguration(targetAlphaContext.getCluster().getCluster()));
+        Assert.assertTrue(alpha.exists(new Path("/localDC/rc/billing/ua1/" + partitionValue)));
+
+        FileSystem beta = FileSystem.get(ClusterHelper.getConfiguration(targetBetaContext.getCluster().getCluster()));
+        Assert.assertTrue(beta.exists(new Path("/localDC/rc/billing/ua2/" + partitionValue)));
+
+        InstancesResult response = targetContext.getService().path("api/instance/running/feed/" + feedName)
+                .header("Remote-User", "guest")
+                .accept(MediaType.APPLICATION_JSON)
+                .get(InstancesResult.class);
+        Assert.assertEquals(response.getStatus(), APIResult.Status.SUCCEEDED);
+
+        TestContext.executeWithURL("entity -delete -type feed -name " + feedName);
+        TestContext.executeWithURL("entity -delete -type cluster -name primary-cluster");
+        TestContext.executeWithURL("entity -delete -type cluster -name target-cluster-alpha");
+        TestContext.executeWithURL("entity -delete -type cluster -name target-cluster-beta");
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b3b42ef9/webapp/src/test/resources/table/complex-replicating-feed.xml
----------------------------------------------------------------------
diff --git a/webapp/src/test/resources/table/complex-replicating-feed.xml b/webapp/src/test/resources/table/complex-replicating-feed.xml
new file mode 100644
index 0000000..35e8986
--- /dev/null
+++ b/webapp/src/test/resources/table/complex-replicating-feed.xml
@@ -0,0 +1,71 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<!--
+    Replicating Hourly customer table from primary to secondary cluster.
+  -->
+<feed description="billing RC File" name="complex-replicating-feed" xmlns="uri:falcon:feed:0.1">
+    <partitions>
+        <partition name="colo"/>
+        <partition name="eventTime"/>
+        <partition name="impressionHour"/>
+        <partition name="pricingModel"/>
+    </partitions>
+
+    <groups>online,bi</groups>
+
+    <frequency>minutes(5)</frequency>
+    <timezone>UTC</timezone>
+    <late-arrival cut-off="minutes(1)"/>
+
+    <clusters>
+        <cluster partition="${cluster.colo}" name="primary-cluster" type="source">
+            <validity end="2099-01-01T00:00Z" start="2012-10-01T12:00Z"/>
+            <retention action="delete" limit="days(10000)"/>
+
+            <locations>
+                <location type="data"
+                          path="/falcon/test/primary-cluster/customer_raw/${YEAR}-${MONTH}-${DAY}-${HOUR}/" />
+            </locations>
+        </cluster>
+        <cluster type="target" name="target-cluster-alpha">
+            <validity end="2012-10-01T12:11Z" start="2012-10-01T12:05Z"/>
+            <retention action="delete" limit="days(10000)"/>
+
+            <locations>
+                <location path="/localDC/rc/billing/ua1/${YEAR}-${MONTH}-${DAY}-${HOUR}/" type="data"/>
+            </locations>
+        </cluster>
+        <cluster type="target" name="target-cluster-beta">
+            <validity end="2012-10-01T12:26Z" start="2012-10-01T12:10Z"/>
+            <retention action="delete" limit="days(10000)"/>
+
+            <locations>
+                <location path="/localDC/rc/billing/ua2/${YEAR}-${MONTH}-${DAY}-${HOUR}/" type="data"/>
+            </locations>
+        </cluster>
+    </clusters>
+
+    <locations>
+        <location type="data" path="/falcon/test/customer_raw/${YEAR}-${MONTH}-${DAY}-${HOUR}/" />
+    </locations>
+
+    <ACL owner="seetharam" group="users" permission="0755"/>
+    <schema location="" provider="hcatalog"/>
+
+</feed>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b3b42ef9/webapp/src/test/resources/table/target-cluster-alpha.xml
----------------------------------------------------------------------
diff --git a/webapp/src/test/resources/table/target-cluster-alpha.xml b/webapp/src/test/resources/table/target-cluster-alpha.xml
index c62303e..528fd41 100644
--- a/webapp/src/test/resources/table/target-cluster-alpha.xml
+++ b/webapp/src/test/resources/table/target-cluster-alpha.xml
@@ -19,7 +19,7 @@
 <!--
     target-cluster-alpha configuration for demo vm
   -->
-<cluster colo="east-coast" description="BCP Cluster"
+<cluster colo="ua1" description="BCP Cluster"
          name="target-cluster-alpha"
          xmlns="uri:falcon:cluster:0.1" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b3b42ef9/webapp/src/test/resources/table/target-cluster-beta.xml
----------------------------------------------------------------------
diff --git a/webapp/src/test/resources/table/target-cluster-beta.xml b/webapp/src/test/resources/table/target-cluster-beta.xml
index 3738575..d2c808a 100644
--- a/webapp/src/test/resources/table/target-cluster-beta.xml
+++ b/webapp/src/test/resources/table/target-cluster-beta.xml
@@ -19,7 +19,7 @@
 <!--
     target-cluster-beta configuration for demo vm
   -->
-<cluster colo="east-coast" description="BCP Cluster"
+<cluster colo="ua2" description="BCP Cluster"
          name="target-cluster-beta"
          xmlns="uri:falcon:cluster:0.1" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">