You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ve...@apache.org on 2013/11/02 00:42:08 UTC

[07/13] FALCON-95 Enable embedding hive scripts directly in a process. Contribtued by Venkatesh Seetharam

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/502990c9/webapp/src/test/java/org/apache/falcon/catalog/TableStorageFeedEvictorIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/catalog/TableStorageFeedEvictorIT.java b/webapp/src/test/java/org/apache/falcon/catalog/TableStorageFeedEvictorIT.java
deleted file mode 100644
index bddb3fa..0000000
--- a/webapp/src/test/java/org/apache/falcon/catalog/TableStorageFeedEvictorIT.java
+++ /dev/null
@@ -1,325 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.catalog;
-
-import org.apache.commons.el.ExpressionEvaluatorImpl;
-import org.apache.falcon.Pair;
-import org.apache.falcon.entity.Storage;
-import org.apache.falcon.expression.ExpressionHelper;
-import org.apache.falcon.retention.FeedEvictor;
-import org.apache.falcon.util.HiveTestUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hcatalog.api.HCatAddPartitionDesc;
-import org.apache.hcatalog.api.HCatClient;
-import org.apache.hcatalog.api.HCatPartition;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Test;
-
-import javax.servlet.jsp.el.ELException;
-import javax.servlet.jsp.el.ExpressionEvaluator;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.PrintStream;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Calendar;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TimeZone;
-
-/**
- * Test for FeedEvictor for table.
- */
-public class TableStorageFeedEvictorIT {
-
-    private static final ExpressionEvaluator EVALUATOR = new ExpressionEvaluatorImpl();
-    private static final ExpressionHelper RESOLVER = ExpressionHelper.get();
-
-    private static final String METASTORE_URL = "thrift://localhost:49083/";
-    private static final String DATABASE_NAME = "falcon_db";
-    private static final String TABLE_NAME = "clicks";
-    private static final String EXTERNAL_TABLE_NAME = "clicks_external";
-    private static final String EXTERNAL_TABLE_LOCATION = "hdfs://localhost:41020/falcon/staging/clicks_external/";
-    private static final String FORMAT = "yyyyMMddHHmm";
-
-    private final InMemoryWriter stream = new InMemoryWriter(System.out);
-
-    private HCatClient client;
-
-    @BeforeClass
-    public void setUp() throws Exception {
-        FeedEvictor.OUT.set(stream);
-
-        client = HiveCatalogService.get(METASTORE_URL);
-
-        HiveTestUtils.createDatabase(METASTORE_URL, DATABASE_NAME);
-        final List<String> partitionKeys = Arrays.asList("ds", "region");
-        HiveTestUtils.createTable(METASTORE_URL, DATABASE_NAME, TABLE_NAME, partitionKeys);
-        HiveTestUtils.createExternalTable(METASTORE_URL, DATABASE_NAME, EXTERNAL_TABLE_NAME,
-                partitionKeys, EXTERNAL_TABLE_LOCATION);
-    }
-
-    @AfterClass
-    public void close() throws Exception {
-        HiveTestUtils.dropTable(METASTORE_URL, DATABASE_NAME, EXTERNAL_TABLE_NAME);
-        HiveTestUtils.dropTable(METASTORE_URL, DATABASE_NAME, TABLE_NAME);
-        HiveTestUtils.dropDatabase(METASTORE_URL, DATABASE_NAME);
-    }
-
-    @DataProvider (name = "retentionLimitDataProvider")
-    private Object[][] createRetentionLimitData() {
-        return new Object[][] {
-            {"days(10)", 4},
-            {"days(100)", 7},
-        };
-    }
-
-    @Test (dataProvider = "retentionLimitDataProvider")
-    public void testFeedEvictorForTable(String retentionLimit, int expectedSize) throws Exception {
-
-        final String timeZone = "UTC";
-        final String dateMask = "yyyyMMdd";
-
-        Pair<Date, Date> range = getDateRange(retentionLimit);
-        List<String> candidatePartitions = getCandidatePartitions("days(10)", dateMask, timeZone, 3);
-        addPartitions(TABLE_NAME, candidatePartitions, false);
-
-        try {
-            stream.clear();
-
-            final String tableUri = DATABASE_NAME + "/" + TABLE_NAME + "/ds=${YEAR}${MONTH}${DAY};region=us";
-            String feedBasePath = METASTORE_URL + tableUri;
-            String logFile = "hdfs://localhost:41020/falcon/staging/feed/instancePaths-2013-09-13-01-00.csv";
-
-            FeedEvictor.main(new String[]{
-                "-feedBasePath", feedBasePath,
-                "-retentionType", "instance",
-                "-retentionLimit", retentionLimit,
-                "-timeZone", timeZone,
-                "-frequency", "daily",
-                "-logFile", logFile,
-                "-falconFeedStorageType", Storage.TYPE.TABLE.name(),
-            });
-
-            List<HCatPartition> partitions = client.getPartitions(DATABASE_NAME, TABLE_NAME);
-            Assert.assertEquals(partitions.size(), expectedSize, "Unexpected number of partitions");
-
-            Assert.assertEquals(readLogFile(new Path(logFile)),
-                    getExpectedInstancePaths(candidatePartitions, range.first, dateMask, timeZone));
-
-        } catch (Exception e) {
-            Assert.fail("Unknown exception", e);
-        } finally {
-            dropPartitions(TABLE_NAME, candidatePartitions);
-        }
-    }
-
-    @Test (dataProvider = "retentionLimitDataProvider")
-    public void testFeedEvictorForExternalTable(String retentionLimit, int expectedSize) throws Exception {
-
-        final String timeZone = "UTC";
-        final String dateMask = "yyyyMMdd";
-
-        Pair<Date, Date> range = getDateRange(retentionLimit);
-        List<String> candidatePartitions = getCandidatePartitions("days(10)", dateMask, timeZone, 3);
-        addPartitions(EXTERNAL_TABLE_NAME, candidatePartitions, true);
-
-        try {
-            stream.clear();
-
-            final String tableUri = DATABASE_NAME + "/" + EXTERNAL_TABLE_NAME + "/ds=${YEAR}${MONTH}${DAY};region=us";
-            String feedBasePath = METASTORE_URL + tableUri;
-            String logFile = "hdfs://localhost:41020/falcon/staging/feed/instancePaths-2013-09-13-01-00.csv";
-
-            FeedEvictor.main(new String[]{
-                "-feedBasePath", feedBasePath,
-                "-retentionType", "instance",
-                "-retentionLimit", retentionLimit,
-                "-timeZone", timeZone,
-                "-frequency", "daily",
-                "-logFile", logFile,
-                "-falconFeedStorageType", Storage.TYPE.TABLE.name(),
-            });
-
-            List<HCatPartition> partitions = client.getPartitions(DATABASE_NAME, EXTERNAL_TABLE_NAME);
-            Assert.assertEquals(partitions.size(), expectedSize, "Unexpected number of partitions");
-
-            Assert.assertEquals(readLogFile(new Path(logFile)),
-                    getExpectedInstancePaths(candidatePartitions, range.first, dateMask, timeZone));
-
-            verifyFSPartitionsAreDeleted(candidatePartitions, range.first, dateMask, timeZone);
-
-        } catch (Exception e) {
-            Assert.fail("Unknown exception", e);
-        } finally {
-            dropPartitions(EXTERNAL_TABLE_NAME, candidatePartitions);
-        }
-    }
-
-    public List<String> getCandidatePartitions(String retentionLimit, String dateMask,
-                                               String timeZone, int limit) throws Exception {
-        List<String> partitions = new ArrayList<String>();
-
-        Pair<Date, Date> range = getDateRange(retentionLimit);
-
-        DateFormat dateFormat = new SimpleDateFormat(FORMAT.substring(0, dateMask.length()));
-        dateFormat.setTimeZone(TimeZone.getTimeZone(timeZone));
-
-        String startDate = dateFormat.format(range.first);
-        partitions.add(startDate);
-
-        Calendar calendar = Calendar.getInstance();
-        calendar.setTime(range.first);
-        for (int i = 1; i <= limit; i++) {
-            calendar.add(Calendar.DAY_OF_MONTH, -(i + 1));
-            partitions.add(dateFormat.format(calendar.getTime()));
-        }
-
-        calendar.setTime(range.second);
-        for (int i = 1; i <= limit; i++) {
-            calendar.add(Calendar.DAY_OF_MONTH, -(i + 1));
-            partitions.add(dateFormat.format(calendar.getTime()));
-        }
-
-        return partitions;
-    }
-
-    private Pair<Date, Date> getDateRange(String period) throws ELException {
-        Long duration = (Long) EVALUATOR.evaluate("${" + period + "}",
-                Long.class, RESOLVER, RESOLVER);
-        Date end = new Date();
-        Date start = new Date(end.getTime() - duration);
-        return Pair.of(start, end);
-    }
-
-    private void addPartitions(String tableName, List<String> candidatePartitions,
-                               boolean isTableExternal) throws Exception {
-        Path path = new Path(EXTERNAL_TABLE_LOCATION);
-        FileSystem fs = path.getFileSystem(new Configuration());
-
-        for (String candidatePartition : candidatePartitions) {
-            if (isTableExternal) {
-                touch(fs, EXTERNAL_TABLE_LOCATION + candidatePartition);
-            }
-
-            Map<String, String> partition = new HashMap<String, String>();
-            partition.put("ds", candidatePartition); //yyyyMMDD
-            partition.put("region", "in");
-            HCatAddPartitionDesc addPtn = HCatAddPartitionDesc.create(
-                    DATABASE_NAME, tableName, null, partition).build();
-            client.addPartition(addPtn);
-        }
-    }
-
-    private void touch(FileSystem fs, String path) throws Exception {
-        fs.create(new Path(path)).close();
-    }
-
-    private void dropPartitions(String tableName, List<String> candidatePartitions) throws Exception {
-
-        for (String candidatePartition : candidatePartitions) {
-            Map<String, String> partition = new HashMap<String, String>();
-            partition.put("ds", candidatePartition); //yyyyMMDD
-            partition.put("region", "in");
-            client.dropPartitions(DATABASE_NAME, tableName, partition, true);
-        }
-    }
-
-    public String getExpectedInstancePaths(List<String> candidatePartitions, Date date,
-                                           String dateMask, String timeZone) {
-        Collections.sort(candidatePartitions);
-        StringBuilder instances = new StringBuilder("instancePaths=");
-
-        DateFormat dateFormat = new SimpleDateFormat(FORMAT.substring(0, dateMask.length()));
-        dateFormat.setTimeZone(TimeZone.getTimeZone(timeZone));
-        String startDate = dateFormat.format(date);
-
-        for (String candidatePartition : candidatePartitions) {
-            if (candidatePartition.compareTo(startDate) < 0) {
-                instances.append("[")
-                        .append(candidatePartition)
-                        .append(", in],");
-            }
-        }
-
-        return instances.toString();
-    }
-
-    private void verifyFSPartitionsAreDeleted(List<String> candidatePartitions, Date date,
-                                              String dateMask, String timeZone) throws IOException {
-
-        FileSystem fs = new Path(EXTERNAL_TABLE_LOCATION).getFileSystem(new Configuration());
-
-        DateFormat dateFormat = new SimpleDateFormat(FORMAT.substring(0, dateMask.length()));
-        dateFormat.setTimeZone(TimeZone.getTimeZone(timeZone));
-        String startDate = dateFormat.format(date);
-
-        Collections.sort(candidatePartitions);
-        for (String candidatePartition : candidatePartitions) {
-            final String path = EXTERNAL_TABLE_LOCATION + "ds=" + candidatePartition + "/region=in";
-            if (candidatePartition.compareTo(startDate) < 0
-                    && fs.exists(new Path(path))) {
-                Assert.fail("Expecting " + path + " to be deleted");
-            }
-        }
-    }
-
-    private String readLogFile(Path logFile) throws IOException {
-        ByteArrayOutputStream writer = new ByteArrayOutputStream();
-        InputStream date = logFile.getFileSystem(new Configuration()).open(logFile);
-        IOUtils.copyBytes(date, writer, 4096, true);
-        return writer.toString();
-    }
-
-    private static class InMemoryWriter extends PrintStream {
-
-        private final StringBuffer buffer = new StringBuffer();
-
-        public InMemoryWriter(OutputStream out) {
-            super(out);
-        }
-
-        @Override
-        public void println(String x) {
-            buffer.append(x);
-            super.println(x);
-        }
-
-        public String getBuffer() {
-            return buffer.toString();
-        }
-
-        public void clear() {
-            buffer.delete(0, buffer.length());
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/502990c9/webapp/src/test/java/org/apache/falcon/catalog/TableStorageFeedReplicationIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/catalog/TableStorageFeedReplicationIT.java b/webapp/src/test/java/org/apache/falcon/catalog/TableStorageFeedReplicationIT.java
deleted file mode 100644
index a736ea1..0000000
--- a/webapp/src/test/java/org/apache/falcon/catalog/TableStorageFeedReplicationIT.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.catalog;
-
-import org.apache.falcon.entity.ClusterHelper;
-import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.entity.v0.cluster.Interfacetype;
-import org.apache.falcon.resource.APIResult;
-import org.apache.falcon.resource.InstancesResult;
-import org.apache.falcon.resource.TestContext;
-import org.apache.falcon.util.HiveTestUtils;
-import org.apache.falcon.util.OozieTestUtils;
-import org.apache.falcon.util.StartupProperties;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hcatalog.api.HCatPartition;
-import org.apache.oozie.client.OozieClient;
-import org.apache.oozie.client.WorkflowJob;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-import javax.ws.rs.core.MediaType;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Integration tests for Feed Replication with Table storage.
- *
- * This test is disabled as it heavily depends on oozie sharelibs for
- * hcatalog being made available on HDFS. captured in FALCON-139.
- */
-@Test (enabled = false)
-public class TableStorageFeedReplicationIT {
-
-    private static final String SOURCE_DATABASE_NAME = "src_demo_db";
-    private static final String SOURCE_TABLE_NAME = "customer_raw";
-
-    private static final String TARGET_DATABASE_NAME = "tgt_demo_db";
-    private static final String TARGET_TABLE_NAME = "customer_bcp";
-
-    private static final String PARTITION_VALUE = "2013-09-24-00"; // ${YEAR}-${MONTH}-${DAY}-${HOUR}
-
-    private final TestContext sourceContext = new TestContext();
-    private String sourceMetastoreUrl;
-
-    private final TestContext targetContext = new TestContext();
-    private String targetMetastoreUrl;
-
-
-    @BeforeClass
-    public void setUp() throws Exception {
-        TestContext.cleanupStore();
-
-        Map<String, String> overlay = sourceContext.getUniqueOverlay();
-        String sourceFilePath = sourceContext.overlayParametersOverTemplate("/table/primary-cluster.xml", overlay);
-        sourceContext.setCluster(sourceFilePath);
-
-        final Cluster sourceCluster = sourceContext.getCluster().getCluster();
-        String sourceStorageUrl = ClusterHelper.getStorageUrl(sourceCluster);
-
-        // copyTestDataToHDFS
-        final String sourcePath = sourceStorageUrl + "/falcon/test/input/" + PARTITION_VALUE;
-        TestContext.copyResourceToHDFS("/apps/pig/data.txt", "data.txt", sourcePath);
-
-        sourceMetastoreUrl = ClusterHelper.getInterface(sourceCluster, Interfacetype.REGISTRY).getEndpoint();
-        setupHiveMetastore(sourceMetastoreUrl, SOURCE_DATABASE_NAME, SOURCE_TABLE_NAME);
-        HiveTestUtils.loadData(sourceMetastoreUrl, SOURCE_DATABASE_NAME, SOURCE_TABLE_NAME, sourcePath,
-                PARTITION_VALUE);
-
-        String targetFilePath = targetContext.overlayParametersOverTemplate("/table/bcp-cluster.xml", overlay);
-        targetContext.setCluster(targetFilePath);
-
-        final Cluster targetCluster = targetContext.getCluster().getCluster();
-        targetMetastoreUrl = ClusterHelper.getInterface(targetCluster, Interfacetype.REGISTRY).getEndpoint();
-        setupHiveMetastore(targetMetastoreUrl, TARGET_DATABASE_NAME, TARGET_TABLE_NAME);
-
-        // set up kahadb to be sent as part of workflows
-        StartupProperties.get().setProperty("libext.paths", "./target/libext");
-        String libext = ClusterHelper.getLocation(targetCluster, "working") + "/libext";
-        String targetStorageUrl = ClusterHelper.getStorageUrl(targetCluster);
-        TestContext.copyOozieShareLibsToHDFS("./target/libext", targetStorageUrl + libext);
-    }
-
-    private void setupHiveMetastore(String metastoreUrl, String databaseName,
-                                    String tableName) throws Exception {
-        cleanupHiveMetastore(metastoreUrl, databaseName, tableName);
-
-        HiveTestUtils.createDatabase(metastoreUrl, databaseName);
-        final List<String> partitionKeys = Arrays.asList("ds");
-        HiveTestUtils.createTable(metastoreUrl, databaseName, tableName, partitionKeys);
-        // todo this is a kludge to work around hive's limitations
-        HiveTestUtils.alterTable(metastoreUrl, databaseName, tableName);
-    }
-
-    @AfterClass
-    public void tearDown() throws Exception {
-        TestContext.executeWithURL("entity -delete -type feed -name customer-table-replicating-feed");
-        TestContext.executeWithURL("entity -delete -type cluster -name primary-cluster");
-        TestContext.executeWithURL("entity -delete -type cluster -name bcp-cluster");
-
-        cleanupHiveMetastore(sourceMetastoreUrl, SOURCE_DATABASE_NAME, SOURCE_TABLE_NAME);
-        cleanupHiveMetastore(targetMetastoreUrl, TARGET_DATABASE_NAME, TARGET_TABLE_NAME);
-
-        cleanupStagingDirs(sourceContext.getCluster().getCluster(), SOURCE_DATABASE_NAME);
-        cleanupStagingDirs(targetContext.getCluster().getCluster(), TARGET_DATABASE_NAME);
-    }
-
-    private void cleanupHiveMetastore(String metastoreUrl, String databaseName, String tableName) throws Exception {
-        HiveTestUtils.dropTable(metastoreUrl, databaseName, tableName);
-        HiveTestUtils.dropDatabase(metastoreUrl, databaseName);
-    }
-
-    private void cleanupStagingDirs(Cluster cluster, String databaseName) throws IOException {
-        FileSystem fs = FileSystem.get(ClusterHelper.getConfiguration(cluster));
-        String stagingDir = "/apps/falcon/staging/"
-                + "FALCON_FEED_REPLICATION_customer-table-replicating-feed_primary-cluster/"
-                + databaseName;
-        fs.delete(new Path(stagingDir), true);
-    }
-
-    @Test (enabled = false)
-    public void testTableReplication() throws Exception {
-        final String feedName = "customer-table-replicating-feed";
-        final Map<String, String> overlay = sourceContext.getUniqueOverlay();
-        String filePath = sourceContext.overlayParametersOverTemplate("/table/primary-cluster.xml", overlay);
-        Assert.assertEquals(0, TestContext.executeWithURL("entity -submit -type cluster -file " + filePath));
-
-        filePath = targetContext.overlayParametersOverTemplate("/table/bcp-cluster.xml", overlay);
-        Assert.assertEquals(0, TestContext.executeWithURL("entity -submit -type cluster -file " + filePath));
-
-        HCatPartition sourcePartition = HiveTestUtils.getPartition(
-                sourceMetastoreUrl, SOURCE_DATABASE_NAME, SOURCE_TABLE_NAME, "ds", PARTITION_VALUE);
-        Assert.assertNotNull(sourcePartition);
-
-        filePath = sourceContext.overlayParametersOverTemplate("/table/customer-table-replicating-feed.xml", overlay);
-        Assert.assertEquals(0, TestContext.executeWithURL("entity -submitAndSchedule -type feed -file " + filePath));
-
-        // wait until the workflow job completes
-        WorkflowJob jobInfo = OozieTestUtils.getWorkflowJob(targetContext.getCluster().getCluster(),
-                OozieClient.FILTER_NAME + "=FALCON_FEED_REPLICATION_" + feedName);
-        Assert.assertEquals(WorkflowJob.Status.SUCCEEDED, jobInfo.getStatus());
-
-        // verify if the partition on the target exists
-        HCatPartition targetPartition = HiveTestUtils.getPartition(
-                targetMetastoreUrl, TARGET_DATABASE_NAME, TARGET_TABLE_NAME, "ds", PARTITION_VALUE);
-        Assert.assertNotNull(targetPartition);
-
-        InstancesResult response = targetContext.getService().path("api/instance/running/feed/" + feedName)
-                .header("Remote-User", "guest")
-                .accept(MediaType.APPLICATION_JSON)
-                .get(InstancesResult.class);
-        Assert.assertEquals(APIResult.Status.SUCCEEDED, response.getStatus());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/502990c9/webapp/src/test/java/org/apache/falcon/lifecycle/FileSystemFeedReplicationIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/lifecycle/FileSystemFeedReplicationIT.java b/webapp/src/test/java/org/apache/falcon/lifecycle/FileSystemFeedReplicationIT.java
new file mode 100644
index 0000000..f00bdd7
--- /dev/null
+++ b/webapp/src/test/java/org/apache/falcon/lifecycle/FileSystemFeedReplicationIT.java
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.lifecycle;
+
+import org.apache.falcon.entity.ClusterHelper;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.resource.APIResult;
+import org.apache.falcon.resource.InstancesResult;
+import org.apache.falcon.resource.TestContext;
+import org.apache.falcon.util.FSUtils;
+import org.apache.falcon.util.OozieTestUtils;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.client.WorkflowJob;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import javax.ws.rs.core.MediaType;
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Integration tests for Feed Replication with Table storage.
+ *
+ * This test is disabled as it heavily depends on oozie sharelibs for
+ * hcatalog being made available on HDFS. captured in FALCON-139.
+ */
+@Test (enabled = false)
+public class FileSystemFeedReplicationIT {
+
+    private static final String PARTITION_VALUE = "2013-10-24-00"; // ${YEAR}-${MONTH}-${DAY}-${HOUR}
+    private static final String SOURCE_LOCATION = "/falcon/test/primary-cluster/customer_raw/";
+    private static final String TARGET_LOCATION = "/falcon/test/bcp-cluster/customer_bcp/";
+
+    private final TestContext sourceContext = new TestContext();
+    private final TestContext targetContext = new TestContext();
+
+    private final TestContext targetAlphaContext = new TestContext();
+    private final TestContext targetBetaContext = new TestContext();
+    private final TestContext targetGammaContext = new TestContext();
+
+    @BeforeClass
+    public void setUp() throws Exception {
+        TestContext.cleanupStore();
+
+        Map<String, String> overlay = sourceContext.getUniqueOverlay();
+        String sourceFilePath = sourceContext.overlayParametersOverTemplate("/table/primary-cluster.xml", overlay);
+        sourceContext.setCluster(sourceFilePath);
+
+        final Cluster sourceCluster = sourceContext.getCluster().getCluster();
+        String sourceStorageUrl = ClusterHelper.getStorageUrl(sourceCluster);
+
+        // copyTestDataToHDFS
+        final String sourcePath = sourceStorageUrl + SOURCE_LOCATION + PARTITION_VALUE;
+        FSUtils.copyResourceToHDFS("/apps/data/data.txt", "data.txt", sourcePath);
+
+        String targetFilePath = targetContext.overlayParametersOverTemplate("/table/bcp-cluster.xml", overlay);
+        targetContext.setCluster(targetFilePath);
+
+        final Cluster targetCluster = targetContext.getCluster().getCluster();
+        copyLibsToHDFS(targetCluster);
+
+        String file = targetAlphaContext.overlayParametersOverTemplate("/table/target-cluster-alpha.xml", overlay);
+        targetAlphaContext.setCluster(file);
+        copyLibsToHDFS(targetAlphaContext.getCluster().getCluster());
+
+        file = targetBetaContext.overlayParametersOverTemplate("/table/target-cluster-beta.xml", overlay);
+        targetBetaContext.setCluster(file);
+        copyLibsToHDFS(targetBetaContext.getCluster().getCluster());
+
+        file = targetGammaContext.overlayParametersOverTemplate("/table/target-cluster-gamma.xml", overlay);
+        targetGammaContext.setCluster(file);
+        copyLibsToHDFS(targetGammaContext.getCluster().getCluster());
+    }
+
+    private void copyLibsToHDFS(Cluster cluster) throws IOException {
+        // set up kahadb to be sent as part of workflows
+        StartupProperties.get().setProperty("libext.paths", "./target/libext");
+        String libext = ClusterHelper.getLocation(cluster, "working") + "/libext";
+        String targetStorageUrl = ClusterHelper.getStorageUrl(cluster);
+        FSUtils.copyOozieShareLibsToHDFS("./target/libext", targetStorageUrl + libext);
+    }
+
+    @AfterClass
+    public void tearDown() throws Exception {
+        TestContext.executeWithURL("entity -delete -type feed -name customer-fs-replicating-feed");
+        TestContext.executeWithURL("entity -delete -type cluster -name primary-cluster");
+        TestContext.executeWithURL("entity -delete -type cluster -name bcp-cluster");
+
+        cleanupStagingDirs(sourceContext.getCluster().getCluster());
+        cleanupStagingDirs(targetContext.getCluster().getCluster());
+
+        cleanupStagingDirs(targetAlphaContext.getCluster().getCluster());
+        cleanupStagingDirs(targetBetaContext.getCluster().getCluster());
+        cleanupStagingDirs(targetGammaContext.getCluster().getCluster());
+    }
+
+    private void cleanupStagingDirs(Cluster cluster) throws IOException {
+        FileSystem fs = FileSystem.get(ClusterHelper.getConfiguration(cluster));
+        fs.delete(new Path("/falcon/test"), true);
+    }
+
+    @Test (enabled = false)
+    public void testFSReplicationSingleSourceToTarget() throws Exception {
+        final Map<String, String> overlay = sourceContext.getUniqueOverlay();
+        String filePath = sourceContext.overlayParametersOverTemplate("/table/primary-cluster.xml", overlay);
+        Assert.assertEquals(0, TestContext.executeWithURL("entity -submit -type cluster -file " + filePath));
+
+        filePath = targetContext.overlayParametersOverTemplate("/table/bcp-cluster.xml", overlay);
+        Assert.assertEquals(0, TestContext.executeWithURL("entity -submit -type cluster -file " + filePath));
+
+        // verify if the partition on the source exists - precondition
+        FileSystem sourceFS = FileSystem.get(ClusterHelper.getConfiguration(sourceContext.getCluster().getCluster()));
+        Assert.assertTrue(sourceFS.exists(new Path(SOURCE_LOCATION + PARTITION_VALUE)));
+
+        filePath = sourceContext.overlayParametersOverTemplate("/table/customer-fs-replicating-feed.xml", overlay);
+        Assert.assertEquals(0, TestContext.executeWithURL("entity -submitAndSchedule -type feed -file " + filePath));
+
+        // wait until the workflow job completes
+        final String feedName = "customer-fs-replicating-feed";
+        WorkflowJob jobInfo = OozieTestUtils.getWorkflowJob(targetContext.getCluster().getCluster(),
+                OozieClient.FILTER_NAME + "=FALCON_FEED_REPLICATION_" + feedName);
+        Assert.assertEquals(jobInfo.getStatus(), WorkflowJob.Status.SUCCEEDED);
+
+        Assert.assertTrue(sourceFS.exists(new Path(SOURCE_LOCATION + PARTITION_VALUE)));
+        // verify if the partition on the target exists
+        FileSystem fs = FileSystem.get(ClusterHelper.getConfiguration(targetContext.getCluster().getCluster()));
+        Assert.assertTrue(fs.exists(new Path(TARGET_LOCATION + PARTITION_VALUE)));
+
+        InstancesResult response = targetContext.getService().path("api/instance/running/feed/" + feedName)
+                .header("Remote-User", "guest")
+                .accept(MediaType.APPLICATION_JSON)
+                .get(InstancesResult.class);
+        Assert.assertEquals(response.getStatus(), APIResult.Status.SUCCEEDED);
+    }
+
+    @Test (enabled = false)
+    public void testFSReplicationSingleSourceToMultipleTargets() throws Exception {
+        final Map<String, String> overlay = sourceContext.getUniqueOverlay();
+        String filePath = sourceContext.overlayParametersOverTemplate("/table/primary-cluster.xml", overlay);
+        Assert.assertEquals(0, TestContext.executeWithURL("entity -submit -type cluster -file " + filePath));
+
+        filePath = targetContext.overlayParametersOverTemplate("/table/target-cluster-alpha.xml", overlay);
+        Assert.assertEquals(0, TestContext.executeWithURL("entity -submit -type cluster -file " + filePath));
+
+        filePath = targetContext.overlayParametersOverTemplate("/table/target-cluster-beta.xml", overlay);
+        Assert.assertEquals(0, TestContext.executeWithURL("entity -submit -type cluster -file " + filePath));
+
+        filePath = targetContext.overlayParametersOverTemplate("/table/target-cluster-gamma.xml", overlay);
+        Assert.assertEquals(0, TestContext.executeWithURL("entity -submit -type cluster -file " + filePath));
+
+        // verify if the partition on the source exists - precondition
+        FileSystem sourceFS = FileSystem.get(ClusterHelper.getConfiguration(sourceContext.getCluster().getCluster()));
+        Assert.assertTrue(sourceFS.exists(new Path(SOURCE_LOCATION + PARTITION_VALUE)));
+
+        filePath = sourceContext.overlayParametersOverTemplate("/table/multiple-targets-replicating-feed.xml", overlay);
+        Assert.assertEquals(0, TestContext.executeWithURL("entity -submitAndSchedule -type feed -file " + filePath));
+
+        // wait until the workflow job completes
+        final String feedName = "multiple-targets-replicating-feed";
+        WorkflowJob jobInfo = OozieTestUtils.getWorkflowJob(targetContext.getCluster().getCluster(),
+                OozieClient.FILTER_NAME + "=FALCON_FEED_REPLICATION_" + feedName);
+        Assert.assertEquals(jobInfo.getStatus(), WorkflowJob.Status.SUCCEEDED);
+
+        Assert.assertTrue(sourceFS.exists(new Path(SOURCE_LOCATION + PARTITION_VALUE)));
+
+        // verify if the partition on the target exists
+        FileSystem alpha = FileSystem.get(ClusterHelper.getConfiguration(targetAlphaContext.getCluster().getCluster()));
+        Assert.assertTrue(
+                alpha.exists(new Path("/falcon/test/target-cluster-alpha/customer_alpha/" + PARTITION_VALUE)));
+
+        FileSystem beta = FileSystem.get(ClusterHelper.getConfiguration(targetBetaContext.getCluster().getCluster()));
+        Assert.assertTrue(beta.exists(new Path("/falcon/test/target-cluster-beta/customer_beta/" + PARTITION_VALUE)));
+
+        FileSystem gamma = FileSystem.get(ClusterHelper.getConfiguration(targetAlphaContext.getCluster().getCluster()));
+        Assert.assertTrue(
+                gamma.exists(new Path("/falcon/test/target-cluster-gamma/customer_gamma/" + PARTITION_VALUE)));
+
+        InstancesResult response = targetContext.getService().path("api/instance/running/feed/" + feedName)
+                .header("Remote-User", "guest")
+                .accept(MediaType.APPLICATION_JSON)
+                .get(InstancesResult.class);
+        Assert.assertEquals(response.getStatus(), APIResult.Status.SUCCEEDED);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/502990c9/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedEvictorIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedEvictorIT.java b/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedEvictorIT.java
new file mode 100644
index 0000000..4d8ced0
--- /dev/null
+++ b/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedEvictorIT.java
@@ -0,0 +1,326 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.lifecycle;
+
+import org.apache.commons.el.ExpressionEvaluatorImpl;
+import org.apache.falcon.Pair;
+import org.apache.falcon.catalog.HiveCatalogService;
+import org.apache.falcon.entity.Storage;
+import org.apache.falcon.expression.ExpressionHelper;
+import org.apache.falcon.retention.FeedEvictor;
+import org.apache.falcon.util.HiveTestUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hcatalog.api.HCatAddPartitionDesc;
+import org.apache.hcatalog.api.HCatClient;
+import org.apache.hcatalog.api.HCatPartition;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import javax.servlet.jsp.el.ELException;
+import javax.servlet.jsp.el.ExpressionEvaluator;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+
+/**
+ * Test for FeedEvictor for table.
+ */
+public class TableStorageFeedEvictorIT {
+
+    private static final ExpressionEvaluator EVALUATOR = new ExpressionEvaluatorImpl();
+    private static final ExpressionHelper RESOLVER = ExpressionHelper.get();
+
+    private static final String METASTORE_URL = "thrift://localhost:49083/";
+    private static final String DATABASE_NAME = "falcon_db";
+    private static final String TABLE_NAME = "clicks";
+    private static final String EXTERNAL_TABLE_NAME = "clicks_external";
+    private static final String EXTERNAL_TABLE_LOCATION = "hdfs://localhost:41020/falcon/staging/clicks_external/";
+    private static final String FORMAT = "yyyyMMddHHmm";
+
+    private final InMemoryWriter stream = new InMemoryWriter(System.out);
+
+    private HCatClient client;
+
+    @BeforeClass
+    public void setUp() throws Exception {
+        FeedEvictor.OUT.set(stream);
+
+        client = HiveCatalogService.get(METASTORE_URL);
+
+        HiveTestUtils.createDatabase(METASTORE_URL, DATABASE_NAME);
+        final List<String> partitionKeys = Arrays.asList("ds", "region");
+        HiveTestUtils.createTable(METASTORE_URL, DATABASE_NAME, TABLE_NAME, partitionKeys);
+        HiveTestUtils.createExternalTable(METASTORE_URL, DATABASE_NAME, EXTERNAL_TABLE_NAME,
+                partitionKeys, EXTERNAL_TABLE_LOCATION);
+    }
+
+    @AfterClass
+    public void close() throws Exception {
+        HiveTestUtils.dropTable(METASTORE_URL, DATABASE_NAME, EXTERNAL_TABLE_NAME);
+        HiveTestUtils.dropTable(METASTORE_URL, DATABASE_NAME, TABLE_NAME);
+        HiveTestUtils.dropDatabase(METASTORE_URL, DATABASE_NAME);
+    }
+
+    @DataProvider (name = "retentionLimitDataProvider")
+    private Object[][] createRetentionLimitData() {
+        return new Object[][] {
+            {"days(10)", 4},
+            {"days(100)", 7},
+        };
+    }
+
+    @Test (dataProvider = "retentionLimitDataProvider")
+    public void testFeedEvictorForTable(String retentionLimit, int expectedSize) throws Exception {
+
+        final String timeZone = "UTC";
+        final String dateMask = "yyyyMMdd";
+
+        Pair<Date, Date> range = getDateRange(retentionLimit);
+        List<String> candidatePartitions = getCandidatePartitions("days(10)", dateMask, timeZone, 3);
+        addPartitions(TABLE_NAME, candidatePartitions, false);
+
+        try {
+            stream.clear();
+
+            final String tableUri = DATABASE_NAME + "/" + TABLE_NAME + "/ds=${YEAR}${MONTH}${DAY};region=us";
+            String feedBasePath = METASTORE_URL + tableUri;
+            String logFile = "hdfs://localhost:41020/falcon/staging/feed/instancePaths-2013-09-13-01-00.csv";
+
+            FeedEvictor.main(new String[]{
+                "-feedBasePath", feedBasePath,
+                "-retentionType", "instance",
+                "-retentionLimit", retentionLimit,
+                "-timeZone", timeZone,
+                "-frequency", "daily",
+                "-logFile", logFile,
+                "-falconFeedStorageType", Storage.TYPE.TABLE.name(),
+            });
+
+            List<HCatPartition> partitions = client.getPartitions(DATABASE_NAME, TABLE_NAME);
+            Assert.assertEquals(partitions.size(), expectedSize, "Unexpected number of partitions");
+
+            Assert.assertEquals(readLogFile(new Path(logFile)),
+                    getExpectedInstancePaths(candidatePartitions, range.first, dateMask, timeZone));
+
+        } catch (Exception e) {
+            Assert.fail("Unknown exception", e);
+        } finally {
+            dropPartitions(TABLE_NAME, candidatePartitions);
+        }
+    }
+
+    @Test (dataProvider = "retentionLimitDataProvider")
+    public void testFeedEvictorForExternalTable(String retentionLimit, int expectedSize) throws Exception {
+
+        final String timeZone = "UTC";
+        final String dateMask = "yyyyMMdd";
+
+        Pair<Date, Date> range = getDateRange(retentionLimit);
+        List<String> candidatePartitions = getCandidatePartitions("days(10)", dateMask, timeZone, 3);
+        addPartitions(EXTERNAL_TABLE_NAME, candidatePartitions, true);
+
+        try {
+            stream.clear();
+
+            final String tableUri = DATABASE_NAME + "/" + EXTERNAL_TABLE_NAME + "/ds=${YEAR}${MONTH}${DAY};region=us";
+            String feedBasePath = METASTORE_URL + tableUri;
+            String logFile = "hdfs://localhost:41020/falcon/staging/feed/instancePaths-2013-09-13-01-00.csv";
+
+            FeedEvictor.main(new String[]{
+                "-feedBasePath", feedBasePath,
+                "-retentionType", "instance",
+                "-retentionLimit", retentionLimit,
+                "-timeZone", timeZone,
+                "-frequency", "daily",
+                "-logFile", logFile,
+                "-falconFeedStorageType", Storage.TYPE.TABLE.name(),
+            });
+
+            List<HCatPartition> partitions = client.getPartitions(DATABASE_NAME, EXTERNAL_TABLE_NAME);
+            Assert.assertEquals(partitions.size(), expectedSize, "Unexpected number of partitions");
+
+            Assert.assertEquals(readLogFile(new Path(logFile)),
+                    getExpectedInstancePaths(candidatePartitions, range.first, dateMask, timeZone));
+
+            verifyFSPartitionsAreDeleted(candidatePartitions, range.first, dateMask, timeZone);
+
+        } catch (Exception e) {
+            Assert.fail("Unknown exception", e);
+        } finally {
+            dropPartitions(EXTERNAL_TABLE_NAME, candidatePartitions);
+        }
+    }
+
+    public List<String> getCandidatePartitions(String retentionLimit, String dateMask,
+                                               String timeZone, int limit) throws Exception {
+        List<String> partitions = new ArrayList<String>();
+
+        Pair<Date, Date> range = getDateRange(retentionLimit);
+
+        DateFormat dateFormat = new SimpleDateFormat(FORMAT.substring(0, dateMask.length()));
+        dateFormat.setTimeZone(TimeZone.getTimeZone(timeZone));
+
+        String startDate = dateFormat.format(range.first);
+        partitions.add(startDate);
+
+        Calendar calendar = Calendar.getInstance();
+        calendar.setTime(range.first);
+        for (int i = 1; i <= limit; i++) {
+            calendar.add(Calendar.DAY_OF_MONTH, -(i + 1));
+            partitions.add(dateFormat.format(calendar.getTime()));
+        }
+
+        calendar.setTime(range.second);
+        for (int i = 1; i <= limit; i++) {
+            calendar.add(Calendar.DAY_OF_MONTH, -(i + 1));
+            partitions.add(dateFormat.format(calendar.getTime()));
+        }
+
+        return partitions;
+    }
+
+    private Pair<Date, Date> getDateRange(String period) throws ELException {
+        Long duration = (Long) EVALUATOR.evaluate("${" + period + "}",
+                Long.class, RESOLVER, RESOLVER);
+        Date end = new Date();
+        Date start = new Date(end.getTime() - duration);
+        return Pair.of(start, end);
+    }
+
+    private void addPartitions(String tableName, List<String> candidatePartitions,
+                               boolean isTableExternal) throws Exception {
+        Path path = new Path(EXTERNAL_TABLE_LOCATION);
+        FileSystem fs = path.getFileSystem(new Configuration());
+
+        for (String candidatePartition : candidatePartitions) {
+            if (isTableExternal) {
+                touch(fs, EXTERNAL_TABLE_LOCATION + candidatePartition);
+            }
+
+            Map<String, String> partition = new HashMap<String, String>();
+            partition.put("ds", candidatePartition); //yyyyMMDD
+            partition.put("region", "in");
+            HCatAddPartitionDesc addPtn = HCatAddPartitionDesc.create(
+                    DATABASE_NAME, tableName, null, partition).build();
+            client.addPartition(addPtn);
+        }
+    }
+
+    private void touch(FileSystem fs, String path) throws Exception {
+        fs.create(new Path(path)).close();
+    }
+
+    private void dropPartitions(String tableName, List<String> candidatePartitions) throws Exception {
+
+        for (String candidatePartition : candidatePartitions) {
+            Map<String, String> partition = new HashMap<String, String>();
+            partition.put("ds", candidatePartition); //yyyyMMDD
+            partition.put("region", "in");
+            client.dropPartitions(DATABASE_NAME, tableName, partition, true);
+        }
+    }
+
+    public String getExpectedInstancePaths(List<String> candidatePartitions, Date date,
+                                           String dateMask, String timeZone) {
+        Collections.sort(candidatePartitions);
+        StringBuilder instances = new StringBuilder("instancePaths=");
+
+        DateFormat dateFormat = new SimpleDateFormat(FORMAT.substring(0, dateMask.length()));
+        dateFormat.setTimeZone(TimeZone.getTimeZone(timeZone));
+        String startDate = dateFormat.format(date);
+
+        for (String candidatePartition : candidatePartitions) {
+            if (candidatePartition.compareTo(startDate) < 0) {
+                instances.append("[")
+                        .append(candidatePartition)
+                        .append(", in],");
+            }
+        }
+
+        return instances.toString();
+    }
+
+    private void verifyFSPartitionsAreDeleted(List<String> candidatePartitions, Date date,
+                                              String dateMask, String timeZone) throws IOException {
+
+        FileSystem fs = new Path(EXTERNAL_TABLE_LOCATION).getFileSystem(new Configuration());
+
+        DateFormat dateFormat = new SimpleDateFormat(FORMAT.substring(0, dateMask.length()));
+        dateFormat.setTimeZone(TimeZone.getTimeZone(timeZone));
+        String startDate = dateFormat.format(date);
+
+        Collections.sort(candidatePartitions);
+        for (String candidatePartition : candidatePartitions) {
+            final String path = EXTERNAL_TABLE_LOCATION + "ds=" + candidatePartition + "/region=in";
+            if (candidatePartition.compareTo(startDate) < 0
+                    && fs.exists(new Path(path))) {
+                Assert.fail("Expecting " + path + " to be deleted");
+            }
+        }
+    }
+
+    private String readLogFile(Path logFile) throws IOException {
+        ByteArrayOutputStream writer = new ByteArrayOutputStream();
+        InputStream date = logFile.getFileSystem(new Configuration()).open(logFile);
+        IOUtils.copyBytes(date, writer, 4096, true);
+        return writer.toString();
+    }
+
+    private static class InMemoryWriter extends PrintStream {
+
+        private final StringBuffer buffer = new StringBuffer();
+
+        public InMemoryWriter(OutputStream out) {
+            super(out);
+        }
+
+        @Override
+        public void println(String x) {
+            buffer.append(x);
+            super.println(x);
+        }
+
+        public String getBuffer() {
+            return buffer.toString();
+        }
+
+        public void clear() {
+            buffer.delete(0, buffer.length());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/502990c9/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedReplicationIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedReplicationIT.java b/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedReplicationIT.java
new file mode 100644
index 0000000..57bccf7
--- /dev/null
+++ b/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedReplicationIT.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.lifecycle;
+
+import org.apache.falcon.entity.ClusterHelper;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.cluster.Interfacetype;
+import org.apache.falcon.resource.APIResult;
+import org.apache.falcon.resource.InstancesResult;
+import org.apache.falcon.resource.TestContext;
+import org.apache.falcon.util.FSUtils;
+import org.apache.falcon.util.HiveTestUtils;
+import org.apache.falcon.util.OozieTestUtils;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hcatalog.api.HCatPartition;
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.client.WorkflowJob;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import javax.ws.rs.core.MediaType;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Integration tests for Feed Replication with Table storage.
+ *
+ * This test is disabled as it heavily depends on oozie sharelibs for
+ * hcatalog being made available on HDFS. captured in FALCON-139.
+ */
+@Test (enabled = false)
+public class TableStorageFeedReplicationIT {
+
+    private static final String SOURCE_DATABASE_NAME = "src_demo_db";
+    private static final String SOURCE_TABLE_NAME = "customer_raw";
+
+    private static final String TARGET_DATABASE_NAME = "tgt_demo_db";
+    private static final String TARGET_TABLE_NAME = "customer_bcp";
+
+    private static final String PARTITION_VALUE = "2013-10-24-00"; // ${YEAR}-${MONTH}-${DAY}-${HOUR}
+
+    private final TestContext sourceContext = new TestContext();
+    private String sourceMetastoreUrl;
+
+    private final TestContext targetContext = new TestContext();
+    private String targetMetastoreUrl;
+
+
+    @BeforeClass
+    public void setUp() throws Exception {
+        TestContext.cleanupStore();
+
+        Map<String, String> overlay = sourceContext.getUniqueOverlay();
+        String sourceFilePath = sourceContext.overlayParametersOverTemplate("/table/primary-cluster.xml", overlay);
+        sourceContext.setCluster(sourceFilePath);
+
+        final Cluster sourceCluster = sourceContext.getCluster().getCluster();
+        String sourceStorageUrl = ClusterHelper.getStorageUrl(sourceCluster);
+
+        // copyTestDataToHDFS
+        final String sourcePath = sourceStorageUrl + "/falcon/test/input/" + PARTITION_VALUE;
+        FSUtils.copyResourceToHDFS("/apps/data/data.txt", "data.txt", sourcePath);
+
+        sourceMetastoreUrl = ClusterHelper.getInterface(sourceCluster, Interfacetype.REGISTRY).getEndpoint();
+        setupHiveMetastore(sourceMetastoreUrl, SOURCE_DATABASE_NAME, SOURCE_TABLE_NAME);
+        HiveTestUtils.loadData(sourceMetastoreUrl, SOURCE_DATABASE_NAME, SOURCE_TABLE_NAME, sourcePath,
+                PARTITION_VALUE);
+
+        String targetFilePath = targetContext.overlayParametersOverTemplate("/table/bcp-cluster.xml", overlay);
+        targetContext.setCluster(targetFilePath);
+
+        final Cluster targetCluster = targetContext.getCluster().getCluster();
+        targetMetastoreUrl = ClusterHelper.getInterface(targetCluster, Interfacetype.REGISTRY).getEndpoint();
+        setupHiveMetastore(targetMetastoreUrl, TARGET_DATABASE_NAME, TARGET_TABLE_NAME);
+
+        copyLibsToHDFS(targetCluster);
+    }
+
+    private void setupHiveMetastore(String metastoreUrl, String databaseName,
+                                    String tableName) throws Exception {
+        cleanupHiveMetastore(metastoreUrl, databaseName, tableName);
+
+        HiveTestUtils.createDatabase(metastoreUrl, databaseName);
+        final List<String> partitionKeys = Arrays.asList("ds");
+        HiveTestUtils.createTable(metastoreUrl, databaseName, tableName, partitionKeys);
+        // todo this is a kludge to work around hive's limitations
+        HiveTestUtils.alterTable(metastoreUrl, databaseName, tableName);
+    }
+
+    private void copyLibsToHDFS(Cluster cluster) throws IOException {
+        // set up kahadb to be sent as part of workflows
+        StartupProperties.get().setProperty("libext.paths", "./target/libext");
+        String libext = ClusterHelper.getLocation(cluster, "working") + "/libext";
+        String targetStorageUrl = ClusterHelper.getStorageUrl(cluster);
+        FSUtils.copyOozieShareLibsToHDFS("./target/libext", targetStorageUrl + libext);
+    }
+
+    @AfterClass
+    public void tearDown() throws Exception {
+        TestContext.executeWithURL("entity -delete -type feed -name customer-table-replicating-feed");
+        TestContext.executeWithURL("entity -delete -type cluster -name primary-cluster");
+        TestContext.executeWithURL("entity -delete -type cluster -name bcp-cluster");
+
+        cleanupHiveMetastore(sourceMetastoreUrl, SOURCE_DATABASE_NAME, SOURCE_TABLE_NAME);
+        cleanupHiveMetastore(targetMetastoreUrl, TARGET_DATABASE_NAME, TARGET_TABLE_NAME);
+
+        cleanupStagingDirs(sourceContext.getCluster().getCluster(), SOURCE_DATABASE_NAME);
+        cleanupStagingDirs(targetContext.getCluster().getCluster(), TARGET_DATABASE_NAME);
+    }
+
+    private void cleanupHiveMetastore(String metastoreUrl, String databaseName, String tableName) throws Exception {
+        HiveTestUtils.dropTable(metastoreUrl, databaseName, tableName);
+        HiveTestUtils.dropDatabase(metastoreUrl, databaseName);
+    }
+
+    private void cleanupStagingDirs(Cluster cluster, String databaseName) throws IOException {
+        FileSystem fs = FileSystem.get(ClusterHelper.getConfiguration(cluster));
+        String stagingDir = "/apps/falcon/staging/"
+                + "FALCON_FEED_REPLICATION_customer-table-replicating-feed_primary-cluster/"
+                + databaseName;
+        fs.delete(new Path(stagingDir), true);
+        fs.delete(new Path("/falcon/test/input"), true);
+    }
+
+    @Test (enabled = false)
+    public void testTableReplication() throws Exception {
+        final String feedName = "customer-table-replicating-feed";
+        final Map<String, String> overlay = sourceContext.getUniqueOverlay();
+        String filePath = sourceContext.overlayParametersOverTemplate("/table/primary-cluster.xml", overlay);
+        Assert.assertEquals(0, TestContext.executeWithURL("entity -submit -type cluster -file " + filePath));
+
+        filePath = targetContext.overlayParametersOverTemplate("/table/bcp-cluster.xml", overlay);
+        Assert.assertEquals(0, TestContext.executeWithURL("entity -submit -type cluster -file " + filePath));
+
+        HCatPartition sourcePartition = HiveTestUtils.getPartition(
+                sourceMetastoreUrl, SOURCE_DATABASE_NAME, SOURCE_TABLE_NAME, "ds", PARTITION_VALUE);
+        Assert.assertNotNull(sourcePartition);
+
+        filePath = sourceContext.overlayParametersOverTemplate("/table/customer-table-replicating-feed.xml", overlay);
+        Assert.assertEquals(0, TestContext.executeWithURL("entity -submitAndSchedule -type feed -file " + filePath));
+
+        // wait until the workflow job completes
+        WorkflowJob jobInfo = OozieTestUtils.getWorkflowJob(targetContext.getCluster().getCluster(),
+                OozieClient.FILTER_NAME + "=FALCON_FEED_REPLICATION_" + feedName);
+        Assert.assertEquals(jobInfo.getStatus(), WorkflowJob.Status.SUCCEEDED);
+
+        // verify if the partition on the target exists
+        HCatPartition targetPartition = HiveTestUtils.getPartition(
+                targetMetastoreUrl, TARGET_DATABASE_NAME, TARGET_TABLE_NAME, "ds", PARTITION_VALUE);
+        Assert.assertNotNull(targetPartition);
+
+        InstancesResult response = targetContext.getService().path("api/instance/running/feed/" + feedName)
+                .header("Remote-User", "guest")
+                .accept(MediaType.APPLICATION_JSON)
+                .get(InstancesResult.class);
+        Assert.assertEquals(response.getStatus(), APIResult.Status.SUCCEEDED);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/502990c9/webapp/src/test/java/org/apache/falcon/process/PigProcessIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/process/PigProcessIT.java b/webapp/src/test/java/org/apache/falcon/process/PigProcessIT.java
new file mode 100644
index 0000000..58ae4ba
--- /dev/null
+++ b/webapp/src/test/java/org/apache/falcon/process/PigProcessIT.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.process;
+
+import org.apache.falcon.entity.ClusterHelper;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.resource.APIResult;
+import org.apache.falcon.resource.InstancesResult;
+import org.apache.falcon.resource.TestContext;
+import org.apache.falcon.util.FSUtils;
+import org.apache.falcon.util.OozieTestUtils;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.hadoop.fs.Path;
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.client.WorkflowJob;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import javax.ws.rs.core.MediaType;
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Integration tests for Pig Processing Engine.
+ *
+ * This test is disabled as it heavily depends on oozie sharelibs for
+ * pig and hcatalog being made available on HDFS. captured in FALCON-139.
+ */
+@Test (enabled = false)
+public class PigProcessIT {
+
+    private static final String CLUSTER_TEMPLATE = "/table/primary-cluster.xml";
+
+    private final TestContext context = new TestContext();
+    private Map<String, String> overlay;
+
+
+    @BeforeClass
+    public void prepare() throws Exception {
+        TestContext.prepare(CLUSTER_TEMPLATE);
+
+        overlay = context.getUniqueOverlay();
+
+        String filePath = context.overlayParametersOverTemplate(CLUSTER_TEMPLATE, overlay);
+        context.setCluster(filePath);
+
+        final Cluster cluster = context.getCluster().getCluster();
+        final String storageUrl = ClusterHelper.getStorageUrl(cluster);
+
+        copyDataAndScriptsToHDFS(storageUrl);
+        copyLibsToHDFS(cluster, storageUrl);
+    }
+
+    private void copyDataAndScriptsToHDFS(String storageUrl) throws IOException {
+        // copyPigScriptToHDFS
+        FSUtils.copyResourceToHDFS(
+                "/apps/pig/id.pig", "id.pig", storageUrl + "/falcon/test/apps/pig");
+        // copyTestDataToHDFS
+        FSUtils.copyResourceToHDFS(
+                "/apps/data/data.txt", "data.txt", storageUrl + "/falcon/test/input/2012/04/21/00");
+    }
+
+    private void copyLibsToHDFS(Cluster cluster, String storageUrl) throws IOException {
+        // set up kahadb to be sent as part of workflows
+        StartupProperties.get().setProperty("libext.paths", "./target/libext");
+        String libext = ClusterHelper.getLocation(cluster, "working") + "/libext";
+        FSUtils.copyOozieShareLibsToHDFS("./target/libext", storageUrl + libext);
+    }
+
+    @Test (enabled = false)
+    public void testSubmitAndSchedulePigProcess() throws Exception {
+        overlay.put("cluster", "primary-cluster");
+
+        String filePath = context.overlayParametersOverTemplate(CLUSTER_TEMPLATE, overlay);
+        Assert.assertEquals(0, TestContext.executeWithURL("entity -submit -type cluster -file " + filePath));
+        // context.setCluster(filePath);
+
+        filePath = context.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE1, overlay);
+        Assert.assertEquals(0,
+                TestContext.executeWithURL("entity -submit -type feed -file " + filePath));
+
+        filePath = context.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE2, overlay);
+        Assert.assertEquals(0,
+                TestContext.executeWithURL("entity -submit -type feed -file " + filePath));
+
+        final String pigProcessName = "pig-" + context.getProcessName();
+        overlay.put("processName", pigProcessName);
+
+        filePath = context.overlayParametersOverTemplate(TestContext.PIG_PROCESS_TEMPLATE, overlay);
+        Assert.assertEquals(0,
+                TestContext.executeWithURL("entity -submitAndSchedule -type process -file " + filePath));
+
+        WorkflowJob jobInfo = OozieTestUtils.getWorkflowJob(context.getCluster().getCluster(),
+                OozieClient.FILTER_NAME + "=FALCON_PROCESS_DEFAULT_" + pigProcessName);
+        Assert.assertEquals(WorkflowJob.Status.SUCCEEDED, jobInfo.getStatus());
+
+        InstancesResult response = context.getService().path("api/instance/running/process/" + pigProcessName)
+                .header("Remote-User", "guest")
+                .accept(MediaType.APPLICATION_JSON)
+                .get(InstancesResult.class);
+        Assert.assertEquals(APIResult.Status.SUCCEEDED, response.getStatus());
+
+        // verify LogMover
+        Path oozieLogPath = OozieTestUtils.getOozieLogPath(context.getCluster().getCluster(), jobInfo);
+        Assert.assertTrue(context.getCluster().getFileSystem().exists(oozieLogPath));
+
+        TestContext.executeWithURL("entity -delete -type process -name " + pigProcessName);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/502990c9/webapp/src/test/java/org/apache/falcon/process/TableStorageProcessIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/process/TableStorageProcessIT.java b/webapp/src/test/java/org/apache/falcon/process/TableStorageProcessIT.java
new file mode 100644
index 0000000..2d539c2
--- /dev/null
+++ b/webapp/src/test/java/org/apache/falcon/process/TableStorageProcessIT.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.process;
+
+import org.apache.falcon.entity.ClusterHelper;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.cluster.Interfacetype;
+import org.apache.falcon.resource.APIResult;
+import org.apache.falcon.resource.InstancesResult;
+import org.apache.falcon.resource.TestContext;
+import org.apache.falcon.util.FSUtils;
+import org.apache.falcon.util.HiveTestUtils;
+import org.apache.falcon.util.OozieTestUtils;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hcatalog.api.HCatPartition;
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.client.WorkflowJob;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import javax.ws.rs.core.MediaType;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Integration tests for Processing Engines, Pig & Hive with both FS and table storage.
+ *
+ * This test is disabled as it heavily depends on oozie sharelibs for
+ * pig and hcatalog being made available on HDFS. captured in FALCON-139.
+ */
+@Test (enabled = false)
+public class TableStorageProcessIT {
+
+    private static final String DATABASE_NAME = "falcon_db";
+    private static final String IN_TABLE_NAME = "input_table";
+    private static final String OUT_TABLE_NAME = "output_table";
+    private static final String PARTITION_VALUE = "2012-04-21-00"; // ${YEAR}-${MONTH}-${DAY}-${HOUR}
+    private static final String CLUSTER_TEMPLATE = "/table/primary-cluster.xml";
+
+    private final TestContext context = new TestContext();
+    private Map<String, String> overlay;
+    private String metastoreUrl;
+
+    @BeforeClass
+    public void prepare() throws Exception {
+        TestContext.prepare(CLUSTER_TEMPLATE);
+
+        overlay = context.getUniqueOverlay();
+        String filePath = context.overlayParametersOverTemplate(CLUSTER_TEMPLATE, overlay);
+        context.setCluster(filePath);
+
+        final Cluster cluster = context.getCluster().getCluster();
+        final String storageUrl = ClusterHelper.getStorageUrl(cluster);
+        metastoreUrl = ClusterHelper.getInterface(cluster, Interfacetype.REGISTRY).getEndpoint();
+
+        copyDataAndScriptsToHDFS(storageUrl);
+        copyLibsToHDFS(cluster, storageUrl);
+
+        setupHiveMetastore(storageUrl);
+        scheduleFeeds();
+    }
+
+    private void copyDataAndScriptsToHDFS(String storageUrl) throws IOException {
+        // copyTestDataToHDFS
+        FSUtils.copyResourceToHDFS(
+                "/apps/data/data.txt", "data.txt", storageUrl + "/falcon/test/input/" + PARTITION_VALUE);
+
+        // copyPigScriptToHDFS
+        FSUtils.copyResourceToHDFS(
+                "/apps/pig/table-id.pig", "table-id.pig", storageUrl + "/falcon/test/apps/pig");
+
+        // copyHiveScriptToHDFS
+        FSUtils.copyResourceToHDFS(
+                "/apps/hive/script.hql", "script.hql", storageUrl + "/falcon/test/apps/hive");
+    }
+
+    private void copyLibsToHDFS(Cluster cluster, String storageUrl) throws IOException {
+        // set up kahadb to be sent as part of workflows
+        StartupProperties.get().setProperty("libext.paths", "./target/libext");
+        String libext = ClusterHelper.getLocation(cluster, "working") + "/libext";
+        FSUtils.copyOozieShareLibsToHDFS("./target/libext", storageUrl + libext);
+    }
+
+    private void setupHiveMetastore(String storageUrl) throws Exception {
+        HiveTestUtils.createDatabase(metastoreUrl, DATABASE_NAME);
+        final List<String> partitionKeys = Arrays.asList("ds");
+        HiveTestUtils.createTable(metastoreUrl, DATABASE_NAME, IN_TABLE_NAME, partitionKeys);
+        final String sourcePath = storageUrl + "/falcon/test/input/" + PARTITION_VALUE;
+        HiveTestUtils.loadData(metastoreUrl, DATABASE_NAME, IN_TABLE_NAME, sourcePath, PARTITION_VALUE);
+
+        HiveTestUtils.createTable(metastoreUrl, DATABASE_NAME, OUT_TABLE_NAME, partitionKeys);
+    }
+
+    private void scheduleFeeds() throws Exception {
+        overlay.put("cluster", "primary-cluster");
+
+        String filePath = context.overlayParametersOverTemplate(CLUSTER_TEMPLATE, overlay);
+        Assert.assertEquals(0, TestContext.executeWithURL("entity -submit -type cluster -file " + filePath));
+
+        filePath = context.overlayParametersOverTemplate("/table/table-feed-input.xml", overlay);
+        Assert.assertEquals(0,
+                TestContext.executeWithURL("entity -submitAndSchedule -type feed -file " + filePath));
+
+        filePath = context.overlayParametersOverTemplate("/table/table-feed-output.xml", overlay);
+        Assert.assertEquals(0,
+                TestContext.executeWithURL("entity -submitAndSchedule -type feed -file " + filePath));
+    }
+
+    @AfterClass
+    public void tearDown() throws Exception {
+        HiveTestUtils.dropDatabase(metastoreUrl, DATABASE_NAME);
+
+        cleanupFS(context.getCluster().getCluster());
+
+        TestContext.executeWithURL("entity -delete -type feed -name output-table");
+        TestContext.executeWithURL("entity -delete -type feed -name input-table");
+        TestContext.executeWithURL("entity -delete -type cluster -name primary-cluster");
+    }
+
+    private void cleanupFS(Cluster cluster) throws IOException {
+        FileSystem fs = FileSystem.get(ClusterHelper.getConfiguration(cluster));
+        fs.delete(new Path("/falcon/test/input/" + PARTITION_VALUE), true);
+        fs.delete(new Path("/apps/data"), true);
+        fs.delete(new Path("/apps/pig"), true);
+        fs.delete(new Path("/apps/hive"), true);
+    }
+
+    @AfterMethod
+    private void cleanHiveMetastore() throws Exception {
+        // clean up the output table for next test
+        HiveTestUtils.dropTable(metastoreUrl, DATABASE_NAME, OUT_TABLE_NAME);
+        final List<String> partitionKeys = Arrays.asList("ds");
+        HiveTestUtils.createTable(metastoreUrl, DATABASE_NAME, OUT_TABLE_NAME, partitionKeys);
+    }
+
+    @Test (enabled = false)
+    public void testSubmitAndSchedulePigProcessForTableStorage() throws Exception {
+        final String pigProcessName = "pig-tables-" + context.getProcessName();
+        overlay.put("processName", pigProcessName);
+
+        String filePath = context.overlayParametersOverTemplate("/table/pig-process-tables.xml", overlay);
+        Assert.assertEquals(0,
+                TestContext.executeWithURL("entity -submitAndSchedule -type process -file " + filePath));
+
+        WorkflowJob jobInfo = OozieTestUtils.getWorkflowJob(context.getCluster().getCluster(),
+                OozieClient.FILTER_NAME + "=FALCON_PROCESS_DEFAULT_" + pigProcessName);
+        Assert.assertEquals(WorkflowJob.Status.SUCCEEDED, jobInfo.getStatus());
+
+        HCatPartition partition = HiveTestUtils.getPartition(
+                metastoreUrl, DATABASE_NAME, OUT_TABLE_NAME, "ds", PARTITION_VALUE);
+        Assert.assertTrue(partition != null);
+
+        InstancesResult response = context.getService().path("api/instance/running/process/" + pigProcessName)
+                .header("Remote-User", "guest")
+                .accept(MediaType.APPLICATION_JSON)
+                .get(InstancesResult.class);
+        Assert.assertEquals(APIResult.Status.SUCCEEDED, response.getStatus());
+
+        TestContext.executeWithURL("entity -delete -type process -name " + pigProcessName);
+    }
+
+
+    @Test (enabled = false)
+    public void testSubmitAndScheduleHiveProcess() throws Exception {
+        final String hiveProcessName = "hive-tables-" + context.getProcessName();
+        overlay.put("processName", hiveProcessName);
+
+        String filePath = context.overlayParametersOverTemplate("/table/hive-process-template.xml", overlay);
+        Assert.assertEquals(0,
+                TestContext.executeWithURL("entity -submitAndSchedule -type process -file " + filePath));
+
+        WorkflowJob jobInfo = OozieTestUtils.getWorkflowJob(context.getCluster().getCluster(),
+                OozieClient.FILTER_NAME + "=FALCON_PROCESS_DEFAULT_" + hiveProcessName);
+        Assert.assertEquals(WorkflowJob.Status.SUCCEEDED, jobInfo.getStatus());
+
+        HCatPartition partition = HiveTestUtils.getPartition(
+                metastoreUrl, DATABASE_NAME, OUT_TABLE_NAME, "ds", PARTITION_VALUE);
+        Assert.assertTrue(partition != null);
+
+        InstancesResult response = context.getService().path("api/instance/running/process/" + hiveProcessName)
+                .header("Remote-User", "guest")
+                .accept(MediaType.APPLICATION_JSON)
+                .get(InstancesResult.class);
+        Assert.assertEquals(APIResult.Status.SUCCEEDED, response.getStatus());
+
+        TestContext.executeWithURL("entity -delete -type process -name " + hiveProcessName);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/502990c9/webapp/src/test/java/org/apache/falcon/resource/ClusterEntityValidationIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/ClusterEntityValidationIT.java b/webapp/src/test/java/org/apache/falcon/resource/ClusterEntityValidationIT.java
deleted file mode 100644
index ee4d44e..0000000
--- a/webapp/src/test/java/org/apache/falcon/resource/ClusterEntityValidationIT.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.resource;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.InputStream;
-import java.util.Map;
-
-import org.apache.falcon.entity.ClusterHelper;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.entity.v0.cluster.Interface;
-import org.apache.falcon.entity.v0.cluster.Interfacetype;
-import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Test;
-
-import com.sun.jersey.api.client.ClientResponse;
-
-/**
- * Tests cluster entity validation to verify if each of the specified
- * interface endpoints are valid.
- */
-public class ClusterEntityValidationIT {
-    private final TestContext context = new TestContext();
-    private Map<String, String> overlay;
-
-
-    @BeforeClass
-    public void setup() throws Exception {
-        TestContext.prepare();
-    }
-
-    /**
-     * Positive test.
-     *
-     * @throws Exception
-     */
-    @Test
-    public void testClusterEntityWithValidInterfaces() throws Exception {
-        overlay = context.getUniqueOverlay();
-        overlay.put("colo", "default");
-        ClientResponse response = context.submitToFalcon(TestContext.CLUSTER_TEMPLATE, overlay, EntityType.CLUSTER);
-        context.assertSuccessful(response);
-    }
-
-
-    @DataProvider(name = "interfaceToInvalidURLs")
-    public Object[][] createInterfaceToInvalidURLData() {
-        return new Object[][] {
-            // TODO FileSystem validates invalid hftp url, does NOT fail
-            // {Interfacetype.READONLY, "hftp://localhost:41119"},
-            {Interfacetype.READONLY, ""},
-            {Interfacetype.READONLY, "localhost:41119"},
-            {Interfacetype.WRITE, "write-interface:9999"},
-            {Interfacetype.WRITE, "hdfs://write-interface:9999"},
-            {Interfacetype.EXECUTE, "execute-interface:9999"},
-            {Interfacetype.WORKFLOW, "workflow-interface:9999/oozie/"},
-            {Interfacetype.WORKFLOW, "http://workflow-interface:9999/oozie/"},
-            {Interfacetype.MESSAGING, "messaging-interface:9999"},
-            {Interfacetype.MESSAGING, "tcp://messaging-interface:9999"},
-            {Interfacetype.REGISTRY, "catalog-interface:9999"},
-            {Interfacetype.REGISTRY, "http://catalog-interface:9999"},
-            {Interfacetype.REGISTRY, "Hcat"},
-        };
-    }
-
-    @Test (dataProvider = "interfaceToInvalidURLs")
-    public void testClusterEntityWithInvalidInterfaces(Interfacetype interfacetype, String endpoint)
-        throws Exception {
-        overlay = context.getUniqueOverlay();
-        String filePath = context.overlayParametersOverTemplate(TestContext.CLUSTER_TEMPLATE, overlay);
-        InputStream stream = new FileInputStream(filePath);
-        Cluster cluster = (Cluster) EntityType.CLUSTER.getUnmarshaller().unmarshal(stream);
-        Assert.assertNotNull(cluster);
-        cluster.setColo("default");  // validations will be ignored if not default & tests fail
-
-        Interface anInterface = ClusterHelper.getInterface(cluster, interfacetype);
-        anInterface.setEndpoint(endpoint);
-
-        File tmpFile = context.getTempFile();
-        EntityType.CLUSTER.getMarshaller().marshal(cluster, tmpFile);
-        ClientResponse response = context.submitFileToFalcon(EntityType.CLUSTER, tmpFile.getAbsolutePath());
-        context.assertFailure(response);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/502990c9/webapp/src/test/java/org/apache/falcon/resource/FeedEntityValidationIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/FeedEntityValidationIT.java b/webapp/src/test/java/org/apache/falcon/resource/FeedEntityValidationIT.java
deleted file mode 100644
index 540691f..0000000
--- a/webapp/src/test/java/org/apache/falcon/resource/FeedEntityValidationIT.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.resource;
-
-import com.sun.jersey.api.client.ClientResponse;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.catalog.HiveCatalogService;
-import org.apache.falcon.entity.parser.EntityParserFactory;
-import org.apache.falcon.entity.parser.FeedEntityParser;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.Frequency;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.feed.LateArrival;
-import org.apache.hcatalog.api.HCatClient;
-import org.apache.hcatalog.api.HCatCreateDBDesc;
-import org.apache.hcatalog.api.HCatCreateTableDesc;
-import org.apache.hcatalog.data.schema.HCatFieldSchema;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Test;
-
-import javax.xml.bind.Marshaller;
-import java.io.FileInputStream;
-import java.io.InputStream;
-import java.io.StringWriter;
-import java.util.ArrayList;
-import java.util.Map;
-
-/**
- * Tests feed entity validation to verify if the table specified is valid.
- */
-public class FeedEntityValidationIT {
-
-    private static final String METASTORE_URL = "thrift://localhost:49083";
-    private static final String DATABASE_NAME = "falcondb";
-    private static final String TABLE_NAME = "clicks";
-    private static final String TABLE_URI =
-            "catalog:" + DATABASE_NAME + ":" + TABLE_NAME + "#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}";
-
-    private final TestContext context = new TestContext();
-    private HCatClient client;
-
-    @BeforeClass
-    public void setup() throws Exception {
-        TestContext.prepare();
-
-        client = HiveCatalogService.get(METASTORE_URL);
-
-        createDatabase();
-        createTable();
-    }
-
-    private void createDatabase() throws Exception {
-        HCatCreateDBDesc dbDesc = HCatCreateDBDesc.create(DATABASE_NAME)
-                .ifNotExists(true).build();
-        client.createDatabase(dbDesc);
-    }
-
-    public void createTable() throws Exception {
-        ArrayList<HCatFieldSchema> cols = new ArrayList<HCatFieldSchema>();
-        cols.add(new HCatFieldSchema("id", HCatFieldSchema.Type.INT, "id comment"));
-        cols.add(new HCatFieldSchema("value", HCatFieldSchema.Type.STRING, "value comment"));
-
-        HCatCreateTableDesc tableDesc = HCatCreateTableDesc
-                .create(DATABASE_NAME, TABLE_NAME, cols)
-                .fileFormat("rcfile")
-                .ifNotExists(true)
-                .comments("falcon integration test")
-                .build();
-        client.createTable(tableDesc);
-    }
-
-    @AfterClass
-    public void tearDown() throws Exception {
-        dropTable();
-        dropDatabase();
-    }
-
-    private void dropTable() throws Exception {
-        client.dropTable(DATABASE_NAME, TABLE_NAME, true);
-    }
-
-    private void dropDatabase() throws Exception {
-        client.dropDatabase(DATABASE_NAME, true, HCatClient.DropDBMode.CASCADE);
-    }
-
-    /**
-     * Positive test.
-     *
-     * @throws Exception
-     */
-    @Test
-    public void testFeedEntityWithValidTable() throws Exception {
-        Map<String, String> overlay = context.getUniqueOverlay();
-        overlay.put("colo", "default");
-
-        ClientResponse response = context.submitToFalcon(TestContext.CLUSTER_TEMPLATE, overlay, EntityType.CLUSTER);
-        context.assertSuccessful(response);
-
-        // submission will parse and validate the feed with table
-        overlay.put("tableUri", TABLE_URI);
-        response = context.submitToFalcon("/hive-table-feed.xml", overlay, EntityType.FEED);
-        context.assertSuccessful(response);
-    }
-
-    /**
-     * Late data handling test.
-     *
-     * @throws Exception
-     */
-    @Test (expectedExceptions = FalconException.class)
-    public void testFeedEntityWithValidTableAndLateArrival() throws Exception {
-        Map<String, String> overlay = context.getUniqueOverlay();
-        overlay.put("colo", "default"); // validations will be ignored if not default & tests fail
-        overlay.put("tableUri", TABLE_URI);
-
-        String filePath = context.overlayParametersOverTemplate("/hive-table-feed.xml", overlay);
-        InputStream stream = new FileInputStream(filePath);
-        FeedEntityParser parser = (FeedEntityParser) EntityParserFactory.getParser(EntityType.FEED);
-        Feed feed = parser.parse(stream);
-        Assert.assertNotNull(feed);
-
-        final LateArrival lateArrival = new LateArrival();
-        lateArrival.setCutOff(new Frequency("4", Frequency.TimeUnit.hours));
-        feed.setLateArrival(lateArrival);
-
-        StringWriter stringWriter = new StringWriter();
-        Marshaller marshaller = EntityType.FEED.getMarshaller();
-        marshaller.marshal(feed, stringWriter);
-        System.out.println(stringWriter.toString());
-        parser.parseAndValidate(stringWriter.toString());
-    }
-
-    @DataProvider(name = "invalidTableUris")
-    public Object[][] createInvalidTableUriData() {
-        return new Object[][] {
-            // does not match with group input's frequency
-            {"catalog:" + DATABASE_NAME + ":" + TABLE_NAME + "#ds=ds=${YEAR}-${MONTH}-${DAY}", ""},
-            {"catalog:" + DATABASE_NAME + ":" + TABLE_NAME + "#ds=ds=${YEAR}-${MONTH}-${DAY}", ""},
-            {"badscheme:" + DATABASE_NAME + ":" + TABLE_NAME + "#ds=ds=${YEAR}-${MONTH}-${DAY}", ""},
-            {"catalog:" + DATABASE_NAME + ":" + "badtable" + "#ds=ds=${YEAR}-${MONTH}-${DAY}", ""},
-            {"catalog:" + "baddb" + ":" + TABLE_NAME + "#ds=ds=${YEAR}-${MONTH}-${DAY}", ""},
-            {"catalog:" + "baddb" + ":" + "badtable" + "#ds=ds=${YEAR}-${MONTH}-${DAY}", ""},
-        };
-    }
-
-    @Test (dataProvider = "invalidTableUris")
-    public void testFeedEntityWithInvalidTableUri(String tableUri, String ignore)
-        throws Exception {
-
-        Map<String, String> overlay = context.getUniqueOverlay();
-        overlay.put("colo", "default");
-
-        ClientResponse response = context.submitToFalcon(TestContext.CLUSTER_TEMPLATE, overlay, EntityType.CLUSTER);
-        context.assertSuccessful(response);
-
-        // submission will parse and validate the feed with table
-        overlay.put("tableUri", tableUri);
-        response = context.submitToFalcon("/hive-table-feed.xml", overlay, EntityType.FEED);
-        context.assertFailure(response);
-    }
-}