You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ve...@apache.org on 2013/11/02 00:42:08 UTC
[07/13] FALCON-95 Enable embedding hive scripts directly in a
process. Contribtued by Venkatesh Seetharam
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/502990c9/webapp/src/test/java/org/apache/falcon/catalog/TableStorageFeedEvictorIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/catalog/TableStorageFeedEvictorIT.java b/webapp/src/test/java/org/apache/falcon/catalog/TableStorageFeedEvictorIT.java
deleted file mode 100644
index bddb3fa..0000000
--- a/webapp/src/test/java/org/apache/falcon/catalog/TableStorageFeedEvictorIT.java
+++ /dev/null
@@ -1,325 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.catalog;
-
-import org.apache.commons.el.ExpressionEvaluatorImpl;
-import org.apache.falcon.Pair;
-import org.apache.falcon.entity.Storage;
-import org.apache.falcon.expression.ExpressionHelper;
-import org.apache.falcon.retention.FeedEvictor;
-import org.apache.falcon.util.HiveTestUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hcatalog.api.HCatAddPartitionDesc;
-import org.apache.hcatalog.api.HCatClient;
-import org.apache.hcatalog.api.HCatPartition;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Test;
-
-import javax.servlet.jsp.el.ELException;
-import javax.servlet.jsp.el.ExpressionEvaluator;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.PrintStream;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Calendar;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TimeZone;
-
-/**
- * Test for FeedEvictor for table.
- */
-public class TableStorageFeedEvictorIT {
-
- private static final ExpressionEvaluator EVALUATOR = new ExpressionEvaluatorImpl();
- private static final ExpressionHelper RESOLVER = ExpressionHelper.get();
-
- private static final String METASTORE_URL = "thrift://localhost:49083/";
- private static final String DATABASE_NAME = "falcon_db";
- private static final String TABLE_NAME = "clicks";
- private static final String EXTERNAL_TABLE_NAME = "clicks_external";
- private static final String EXTERNAL_TABLE_LOCATION = "hdfs://localhost:41020/falcon/staging/clicks_external/";
- private static final String FORMAT = "yyyyMMddHHmm";
-
- private final InMemoryWriter stream = new InMemoryWriter(System.out);
-
- private HCatClient client;
-
- @BeforeClass
- public void setUp() throws Exception {
- FeedEvictor.OUT.set(stream);
-
- client = HiveCatalogService.get(METASTORE_URL);
-
- HiveTestUtils.createDatabase(METASTORE_URL, DATABASE_NAME);
- final List<String> partitionKeys = Arrays.asList("ds", "region");
- HiveTestUtils.createTable(METASTORE_URL, DATABASE_NAME, TABLE_NAME, partitionKeys);
- HiveTestUtils.createExternalTable(METASTORE_URL, DATABASE_NAME, EXTERNAL_TABLE_NAME,
- partitionKeys, EXTERNAL_TABLE_LOCATION);
- }
-
- @AfterClass
- public void close() throws Exception {
- HiveTestUtils.dropTable(METASTORE_URL, DATABASE_NAME, EXTERNAL_TABLE_NAME);
- HiveTestUtils.dropTable(METASTORE_URL, DATABASE_NAME, TABLE_NAME);
- HiveTestUtils.dropDatabase(METASTORE_URL, DATABASE_NAME);
- }
-
- @DataProvider (name = "retentionLimitDataProvider")
- private Object[][] createRetentionLimitData() {
- return new Object[][] {
- {"days(10)", 4},
- {"days(100)", 7},
- };
- }
-
- @Test (dataProvider = "retentionLimitDataProvider")
- public void testFeedEvictorForTable(String retentionLimit, int expectedSize) throws Exception {
-
- final String timeZone = "UTC";
- final String dateMask = "yyyyMMdd";
-
- Pair<Date, Date> range = getDateRange(retentionLimit);
- List<String> candidatePartitions = getCandidatePartitions("days(10)", dateMask, timeZone, 3);
- addPartitions(TABLE_NAME, candidatePartitions, false);
-
- try {
- stream.clear();
-
- final String tableUri = DATABASE_NAME + "/" + TABLE_NAME + "/ds=${YEAR}${MONTH}${DAY};region=us";
- String feedBasePath = METASTORE_URL + tableUri;
- String logFile = "hdfs://localhost:41020/falcon/staging/feed/instancePaths-2013-09-13-01-00.csv";
-
- FeedEvictor.main(new String[]{
- "-feedBasePath", feedBasePath,
- "-retentionType", "instance",
- "-retentionLimit", retentionLimit,
- "-timeZone", timeZone,
- "-frequency", "daily",
- "-logFile", logFile,
- "-falconFeedStorageType", Storage.TYPE.TABLE.name(),
- });
-
- List<HCatPartition> partitions = client.getPartitions(DATABASE_NAME, TABLE_NAME);
- Assert.assertEquals(partitions.size(), expectedSize, "Unexpected number of partitions");
-
- Assert.assertEquals(readLogFile(new Path(logFile)),
- getExpectedInstancePaths(candidatePartitions, range.first, dateMask, timeZone));
-
- } catch (Exception e) {
- Assert.fail("Unknown exception", e);
- } finally {
- dropPartitions(TABLE_NAME, candidatePartitions);
- }
- }
-
- @Test (dataProvider = "retentionLimitDataProvider")
- public void testFeedEvictorForExternalTable(String retentionLimit, int expectedSize) throws Exception {
-
- final String timeZone = "UTC";
- final String dateMask = "yyyyMMdd";
-
- Pair<Date, Date> range = getDateRange(retentionLimit);
- List<String> candidatePartitions = getCandidatePartitions("days(10)", dateMask, timeZone, 3);
- addPartitions(EXTERNAL_TABLE_NAME, candidatePartitions, true);
-
- try {
- stream.clear();
-
- final String tableUri = DATABASE_NAME + "/" + EXTERNAL_TABLE_NAME + "/ds=${YEAR}${MONTH}${DAY};region=us";
- String feedBasePath = METASTORE_URL + tableUri;
- String logFile = "hdfs://localhost:41020/falcon/staging/feed/instancePaths-2013-09-13-01-00.csv";
-
- FeedEvictor.main(new String[]{
- "-feedBasePath", feedBasePath,
- "-retentionType", "instance",
- "-retentionLimit", retentionLimit,
- "-timeZone", timeZone,
- "-frequency", "daily",
- "-logFile", logFile,
- "-falconFeedStorageType", Storage.TYPE.TABLE.name(),
- });
-
- List<HCatPartition> partitions = client.getPartitions(DATABASE_NAME, EXTERNAL_TABLE_NAME);
- Assert.assertEquals(partitions.size(), expectedSize, "Unexpected number of partitions");
-
- Assert.assertEquals(readLogFile(new Path(logFile)),
- getExpectedInstancePaths(candidatePartitions, range.first, dateMask, timeZone));
-
- verifyFSPartitionsAreDeleted(candidatePartitions, range.first, dateMask, timeZone);
-
- } catch (Exception e) {
- Assert.fail("Unknown exception", e);
- } finally {
- dropPartitions(EXTERNAL_TABLE_NAME, candidatePartitions);
- }
- }
-
- public List<String> getCandidatePartitions(String retentionLimit, String dateMask,
- String timeZone, int limit) throws Exception {
- List<String> partitions = new ArrayList<String>();
-
- Pair<Date, Date> range = getDateRange(retentionLimit);
-
- DateFormat dateFormat = new SimpleDateFormat(FORMAT.substring(0, dateMask.length()));
- dateFormat.setTimeZone(TimeZone.getTimeZone(timeZone));
-
- String startDate = dateFormat.format(range.first);
- partitions.add(startDate);
-
- Calendar calendar = Calendar.getInstance();
- calendar.setTime(range.first);
- for (int i = 1; i <= limit; i++) {
- calendar.add(Calendar.DAY_OF_MONTH, -(i + 1));
- partitions.add(dateFormat.format(calendar.getTime()));
- }
-
- calendar.setTime(range.second);
- for (int i = 1; i <= limit; i++) {
- calendar.add(Calendar.DAY_OF_MONTH, -(i + 1));
- partitions.add(dateFormat.format(calendar.getTime()));
- }
-
- return partitions;
- }
-
- private Pair<Date, Date> getDateRange(String period) throws ELException {
- Long duration = (Long) EVALUATOR.evaluate("${" + period + "}",
- Long.class, RESOLVER, RESOLVER);
- Date end = new Date();
- Date start = new Date(end.getTime() - duration);
- return Pair.of(start, end);
- }
-
- private void addPartitions(String tableName, List<String> candidatePartitions,
- boolean isTableExternal) throws Exception {
- Path path = new Path(EXTERNAL_TABLE_LOCATION);
- FileSystem fs = path.getFileSystem(new Configuration());
-
- for (String candidatePartition : candidatePartitions) {
- if (isTableExternal) {
- touch(fs, EXTERNAL_TABLE_LOCATION + candidatePartition);
- }
-
- Map<String, String> partition = new HashMap<String, String>();
- partition.put("ds", candidatePartition); //yyyyMMDD
- partition.put("region", "in");
- HCatAddPartitionDesc addPtn = HCatAddPartitionDesc.create(
- DATABASE_NAME, tableName, null, partition).build();
- client.addPartition(addPtn);
- }
- }
-
- private void touch(FileSystem fs, String path) throws Exception {
- fs.create(new Path(path)).close();
- }
-
- private void dropPartitions(String tableName, List<String> candidatePartitions) throws Exception {
-
- for (String candidatePartition : candidatePartitions) {
- Map<String, String> partition = new HashMap<String, String>();
- partition.put("ds", candidatePartition); //yyyyMMDD
- partition.put("region", "in");
- client.dropPartitions(DATABASE_NAME, tableName, partition, true);
- }
- }
-
- public String getExpectedInstancePaths(List<String> candidatePartitions, Date date,
- String dateMask, String timeZone) {
- Collections.sort(candidatePartitions);
- StringBuilder instances = new StringBuilder("instancePaths=");
-
- DateFormat dateFormat = new SimpleDateFormat(FORMAT.substring(0, dateMask.length()));
- dateFormat.setTimeZone(TimeZone.getTimeZone(timeZone));
- String startDate = dateFormat.format(date);
-
- for (String candidatePartition : candidatePartitions) {
- if (candidatePartition.compareTo(startDate) < 0) {
- instances.append("[")
- .append(candidatePartition)
- .append(", in],");
- }
- }
-
- return instances.toString();
- }
-
- private void verifyFSPartitionsAreDeleted(List<String> candidatePartitions, Date date,
- String dateMask, String timeZone) throws IOException {
-
- FileSystem fs = new Path(EXTERNAL_TABLE_LOCATION).getFileSystem(new Configuration());
-
- DateFormat dateFormat = new SimpleDateFormat(FORMAT.substring(0, dateMask.length()));
- dateFormat.setTimeZone(TimeZone.getTimeZone(timeZone));
- String startDate = dateFormat.format(date);
-
- Collections.sort(candidatePartitions);
- for (String candidatePartition : candidatePartitions) {
- final String path = EXTERNAL_TABLE_LOCATION + "ds=" + candidatePartition + "/region=in";
- if (candidatePartition.compareTo(startDate) < 0
- && fs.exists(new Path(path))) {
- Assert.fail("Expecting " + path + " to be deleted");
- }
- }
- }
-
- private String readLogFile(Path logFile) throws IOException {
- ByteArrayOutputStream writer = new ByteArrayOutputStream();
- InputStream date = logFile.getFileSystem(new Configuration()).open(logFile);
- IOUtils.copyBytes(date, writer, 4096, true);
- return writer.toString();
- }
-
- private static class InMemoryWriter extends PrintStream {
-
- private final StringBuffer buffer = new StringBuffer();
-
- public InMemoryWriter(OutputStream out) {
- super(out);
- }
-
- @Override
- public void println(String x) {
- buffer.append(x);
- super.println(x);
- }
-
- public String getBuffer() {
- return buffer.toString();
- }
-
- public void clear() {
- buffer.delete(0, buffer.length());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/502990c9/webapp/src/test/java/org/apache/falcon/catalog/TableStorageFeedReplicationIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/catalog/TableStorageFeedReplicationIT.java b/webapp/src/test/java/org/apache/falcon/catalog/TableStorageFeedReplicationIT.java
deleted file mode 100644
index a736ea1..0000000
--- a/webapp/src/test/java/org/apache/falcon/catalog/TableStorageFeedReplicationIT.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.catalog;
-
-import org.apache.falcon.entity.ClusterHelper;
-import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.entity.v0.cluster.Interfacetype;
-import org.apache.falcon.resource.APIResult;
-import org.apache.falcon.resource.InstancesResult;
-import org.apache.falcon.resource.TestContext;
-import org.apache.falcon.util.HiveTestUtils;
-import org.apache.falcon.util.OozieTestUtils;
-import org.apache.falcon.util.StartupProperties;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hcatalog.api.HCatPartition;
-import org.apache.oozie.client.OozieClient;
-import org.apache.oozie.client.WorkflowJob;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-import javax.ws.rs.core.MediaType;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Integration tests for Feed Replication with Table storage.
- *
- * This test is disabled as it heavily depends on oozie sharelibs for
- * hcatalog being made available on HDFS. captured in FALCON-139.
- */
-@Test (enabled = false)
-public class TableStorageFeedReplicationIT {
-
- private static final String SOURCE_DATABASE_NAME = "src_demo_db";
- private static final String SOURCE_TABLE_NAME = "customer_raw";
-
- private static final String TARGET_DATABASE_NAME = "tgt_demo_db";
- private static final String TARGET_TABLE_NAME = "customer_bcp";
-
- private static final String PARTITION_VALUE = "2013-09-24-00"; // ${YEAR}-${MONTH}-${DAY}-${HOUR}
-
- private final TestContext sourceContext = new TestContext();
- private String sourceMetastoreUrl;
-
- private final TestContext targetContext = new TestContext();
- private String targetMetastoreUrl;
-
-
- @BeforeClass
- public void setUp() throws Exception {
- TestContext.cleanupStore();
-
- Map<String, String> overlay = sourceContext.getUniqueOverlay();
- String sourceFilePath = sourceContext.overlayParametersOverTemplate("/table/primary-cluster.xml", overlay);
- sourceContext.setCluster(sourceFilePath);
-
- final Cluster sourceCluster = sourceContext.getCluster().getCluster();
- String sourceStorageUrl = ClusterHelper.getStorageUrl(sourceCluster);
-
- // copyTestDataToHDFS
- final String sourcePath = sourceStorageUrl + "/falcon/test/input/" + PARTITION_VALUE;
- TestContext.copyResourceToHDFS("/apps/pig/data.txt", "data.txt", sourcePath);
-
- sourceMetastoreUrl = ClusterHelper.getInterface(sourceCluster, Interfacetype.REGISTRY).getEndpoint();
- setupHiveMetastore(sourceMetastoreUrl, SOURCE_DATABASE_NAME, SOURCE_TABLE_NAME);
- HiveTestUtils.loadData(sourceMetastoreUrl, SOURCE_DATABASE_NAME, SOURCE_TABLE_NAME, sourcePath,
- PARTITION_VALUE);
-
- String targetFilePath = targetContext.overlayParametersOverTemplate("/table/bcp-cluster.xml", overlay);
- targetContext.setCluster(targetFilePath);
-
- final Cluster targetCluster = targetContext.getCluster().getCluster();
- targetMetastoreUrl = ClusterHelper.getInterface(targetCluster, Interfacetype.REGISTRY).getEndpoint();
- setupHiveMetastore(targetMetastoreUrl, TARGET_DATABASE_NAME, TARGET_TABLE_NAME);
-
- // set up kahadb to be sent as part of workflows
- StartupProperties.get().setProperty("libext.paths", "./target/libext");
- String libext = ClusterHelper.getLocation(targetCluster, "working") + "/libext";
- String targetStorageUrl = ClusterHelper.getStorageUrl(targetCluster);
- TestContext.copyOozieShareLibsToHDFS("./target/libext", targetStorageUrl + libext);
- }
-
- private void setupHiveMetastore(String metastoreUrl, String databaseName,
- String tableName) throws Exception {
- cleanupHiveMetastore(metastoreUrl, databaseName, tableName);
-
- HiveTestUtils.createDatabase(metastoreUrl, databaseName);
- final List<String> partitionKeys = Arrays.asList("ds");
- HiveTestUtils.createTable(metastoreUrl, databaseName, tableName, partitionKeys);
- // todo this is a kludge to work around hive's limitations
- HiveTestUtils.alterTable(metastoreUrl, databaseName, tableName);
- }
-
- @AfterClass
- public void tearDown() throws Exception {
- TestContext.executeWithURL("entity -delete -type feed -name customer-table-replicating-feed");
- TestContext.executeWithURL("entity -delete -type cluster -name primary-cluster");
- TestContext.executeWithURL("entity -delete -type cluster -name bcp-cluster");
-
- cleanupHiveMetastore(sourceMetastoreUrl, SOURCE_DATABASE_NAME, SOURCE_TABLE_NAME);
- cleanupHiveMetastore(targetMetastoreUrl, TARGET_DATABASE_NAME, TARGET_TABLE_NAME);
-
- cleanupStagingDirs(sourceContext.getCluster().getCluster(), SOURCE_DATABASE_NAME);
- cleanupStagingDirs(targetContext.getCluster().getCluster(), TARGET_DATABASE_NAME);
- }
-
- private void cleanupHiveMetastore(String metastoreUrl, String databaseName, String tableName) throws Exception {
- HiveTestUtils.dropTable(metastoreUrl, databaseName, tableName);
- HiveTestUtils.dropDatabase(metastoreUrl, databaseName);
- }
-
- private void cleanupStagingDirs(Cluster cluster, String databaseName) throws IOException {
- FileSystem fs = FileSystem.get(ClusterHelper.getConfiguration(cluster));
- String stagingDir = "/apps/falcon/staging/"
- + "FALCON_FEED_REPLICATION_customer-table-replicating-feed_primary-cluster/"
- + databaseName;
- fs.delete(new Path(stagingDir), true);
- }
-
- @Test (enabled = false)
- public void testTableReplication() throws Exception {
- final String feedName = "customer-table-replicating-feed";
- final Map<String, String> overlay = sourceContext.getUniqueOverlay();
- String filePath = sourceContext.overlayParametersOverTemplate("/table/primary-cluster.xml", overlay);
- Assert.assertEquals(0, TestContext.executeWithURL("entity -submit -type cluster -file " + filePath));
-
- filePath = targetContext.overlayParametersOverTemplate("/table/bcp-cluster.xml", overlay);
- Assert.assertEquals(0, TestContext.executeWithURL("entity -submit -type cluster -file " + filePath));
-
- HCatPartition sourcePartition = HiveTestUtils.getPartition(
- sourceMetastoreUrl, SOURCE_DATABASE_NAME, SOURCE_TABLE_NAME, "ds", PARTITION_VALUE);
- Assert.assertNotNull(sourcePartition);
-
- filePath = sourceContext.overlayParametersOverTemplate("/table/customer-table-replicating-feed.xml", overlay);
- Assert.assertEquals(0, TestContext.executeWithURL("entity -submitAndSchedule -type feed -file " + filePath));
-
- // wait until the workflow job completes
- WorkflowJob jobInfo = OozieTestUtils.getWorkflowJob(targetContext.getCluster().getCluster(),
- OozieClient.FILTER_NAME + "=FALCON_FEED_REPLICATION_" + feedName);
- Assert.assertEquals(WorkflowJob.Status.SUCCEEDED, jobInfo.getStatus());
-
- // verify if the partition on the target exists
- HCatPartition targetPartition = HiveTestUtils.getPartition(
- targetMetastoreUrl, TARGET_DATABASE_NAME, TARGET_TABLE_NAME, "ds", PARTITION_VALUE);
- Assert.assertNotNull(targetPartition);
-
- InstancesResult response = targetContext.getService().path("api/instance/running/feed/" + feedName)
- .header("Remote-User", "guest")
- .accept(MediaType.APPLICATION_JSON)
- .get(InstancesResult.class);
- Assert.assertEquals(APIResult.Status.SUCCEEDED, response.getStatus());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/502990c9/webapp/src/test/java/org/apache/falcon/lifecycle/FileSystemFeedReplicationIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/lifecycle/FileSystemFeedReplicationIT.java b/webapp/src/test/java/org/apache/falcon/lifecycle/FileSystemFeedReplicationIT.java
new file mode 100644
index 0000000..f00bdd7
--- /dev/null
+++ b/webapp/src/test/java/org/apache/falcon/lifecycle/FileSystemFeedReplicationIT.java
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.lifecycle;
+
+import org.apache.falcon.entity.ClusterHelper;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.resource.APIResult;
+import org.apache.falcon.resource.InstancesResult;
+import org.apache.falcon.resource.TestContext;
+import org.apache.falcon.util.FSUtils;
+import org.apache.falcon.util.OozieTestUtils;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.client.WorkflowJob;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import javax.ws.rs.core.MediaType;
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Integration tests for Feed Replication with Table storage.
+ *
+ * This test is disabled as it heavily depends on oozie sharelibs for
+ * hcatalog being made available on HDFS. captured in FALCON-139.
+ */
+@Test (enabled = false)
+public class FileSystemFeedReplicationIT {
+
+ private static final String PARTITION_VALUE = "2013-10-24-00"; // ${YEAR}-${MONTH}-${DAY}-${HOUR}
+ private static final String SOURCE_LOCATION = "/falcon/test/primary-cluster/customer_raw/";
+ private static final String TARGET_LOCATION = "/falcon/test/bcp-cluster/customer_bcp/";
+
+ private final TestContext sourceContext = new TestContext();
+ private final TestContext targetContext = new TestContext();
+
+ private final TestContext targetAlphaContext = new TestContext();
+ private final TestContext targetBetaContext = new TestContext();
+ private final TestContext targetGammaContext = new TestContext();
+
+ @BeforeClass
+ public void setUp() throws Exception {
+ TestContext.cleanupStore();
+
+ Map<String, String> overlay = sourceContext.getUniqueOverlay();
+ String sourceFilePath = sourceContext.overlayParametersOverTemplate("/table/primary-cluster.xml", overlay);
+ sourceContext.setCluster(sourceFilePath);
+
+ final Cluster sourceCluster = sourceContext.getCluster().getCluster();
+ String sourceStorageUrl = ClusterHelper.getStorageUrl(sourceCluster);
+
+ // copyTestDataToHDFS
+ final String sourcePath = sourceStorageUrl + SOURCE_LOCATION + PARTITION_VALUE;
+ FSUtils.copyResourceToHDFS("/apps/data/data.txt", "data.txt", sourcePath);
+
+ String targetFilePath = targetContext.overlayParametersOverTemplate("/table/bcp-cluster.xml", overlay);
+ targetContext.setCluster(targetFilePath);
+
+ final Cluster targetCluster = targetContext.getCluster().getCluster();
+ copyLibsToHDFS(targetCluster);
+
+ String file = targetAlphaContext.overlayParametersOverTemplate("/table/target-cluster-alpha.xml", overlay);
+ targetAlphaContext.setCluster(file);
+ copyLibsToHDFS(targetAlphaContext.getCluster().getCluster());
+
+ file = targetBetaContext.overlayParametersOverTemplate("/table/target-cluster-beta.xml", overlay);
+ targetBetaContext.setCluster(file);
+ copyLibsToHDFS(targetBetaContext.getCluster().getCluster());
+
+ file = targetGammaContext.overlayParametersOverTemplate("/table/target-cluster-gamma.xml", overlay);
+ targetGammaContext.setCluster(file);
+ copyLibsToHDFS(targetGammaContext.getCluster().getCluster());
+ }
+
+ private void copyLibsToHDFS(Cluster cluster) throws IOException {
+ // set up kahadb to be sent as part of workflows
+ StartupProperties.get().setProperty("libext.paths", "./target/libext");
+ String libext = ClusterHelper.getLocation(cluster, "working") + "/libext";
+ String targetStorageUrl = ClusterHelper.getStorageUrl(cluster);
+ FSUtils.copyOozieShareLibsToHDFS("./target/libext", targetStorageUrl + libext);
+ }
+
+ @AfterClass
+ public void tearDown() throws Exception {
+ TestContext.executeWithURL("entity -delete -type feed -name customer-fs-replicating-feed");
+ TestContext.executeWithURL("entity -delete -type cluster -name primary-cluster");
+ TestContext.executeWithURL("entity -delete -type cluster -name bcp-cluster");
+
+ cleanupStagingDirs(sourceContext.getCluster().getCluster());
+ cleanupStagingDirs(targetContext.getCluster().getCluster());
+
+ cleanupStagingDirs(targetAlphaContext.getCluster().getCluster());
+ cleanupStagingDirs(targetBetaContext.getCluster().getCluster());
+ cleanupStagingDirs(targetGammaContext.getCluster().getCluster());
+ }
+
+ private void cleanupStagingDirs(Cluster cluster) throws IOException {
+ FileSystem fs = FileSystem.get(ClusterHelper.getConfiguration(cluster));
+ fs.delete(new Path("/falcon/test"), true);
+ }
+
+ @Test (enabled = false)
+ public void testFSReplicationSingleSourceToTarget() throws Exception {
+ final Map<String, String> overlay = sourceContext.getUniqueOverlay();
+ String filePath = sourceContext.overlayParametersOverTemplate("/table/primary-cluster.xml", overlay);
+ Assert.assertEquals(0, TestContext.executeWithURL("entity -submit -type cluster -file " + filePath));
+
+ filePath = targetContext.overlayParametersOverTemplate("/table/bcp-cluster.xml", overlay);
+ Assert.assertEquals(0, TestContext.executeWithURL("entity -submit -type cluster -file " + filePath));
+
+ // verify if the partition on the source exists - precondition
+ FileSystem sourceFS = FileSystem.get(ClusterHelper.getConfiguration(sourceContext.getCluster().getCluster()));
+ Assert.assertTrue(sourceFS.exists(new Path(SOURCE_LOCATION + PARTITION_VALUE)));
+
+ filePath = sourceContext.overlayParametersOverTemplate("/table/customer-fs-replicating-feed.xml", overlay);
+ Assert.assertEquals(0, TestContext.executeWithURL("entity -submitAndSchedule -type feed -file " + filePath));
+
+ // wait until the workflow job completes
+ final String feedName = "customer-fs-replicating-feed";
+ WorkflowJob jobInfo = OozieTestUtils.getWorkflowJob(targetContext.getCluster().getCluster(),
+ OozieClient.FILTER_NAME + "=FALCON_FEED_REPLICATION_" + feedName);
+ Assert.assertEquals(jobInfo.getStatus(), WorkflowJob.Status.SUCCEEDED);
+
+ Assert.assertTrue(sourceFS.exists(new Path(SOURCE_LOCATION + PARTITION_VALUE)));
+ // verify if the partition on the target exists
+ FileSystem fs = FileSystem.get(ClusterHelper.getConfiguration(targetContext.getCluster().getCluster()));
+ Assert.assertTrue(fs.exists(new Path(TARGET_LOCATION + PARTITION_VALUE)));
+
+ InstancesResult response = targetContext.getService().path("api/instance/running/feed/" + feedName)
+ .header("Remote-User", "guest")
+ .accept(MediaType.APPLICATION_JSON)
+ .get(InstancesResult.class);
+ Assert.assertEquals(response.getStatus(), APIResult.Status.SUCCEEDED);
+ }
+
+ @Test (enabled = false)
+ public void testFSReplicationSingleSourceToMultipleTargets() throws Exception {
+ final Map<String, String> overlay = sourceContext.getUniqueOverlay();
+ String filePath = sourceContext.overlayParametersOverTemplate("/table/primary-cluster.xml", overlay);
+ Assert.assertEquals(0, TestContext.executeWithURL("entity -submit -type cluster -file " + filePath));
+
+ filePath = targetContext.overlayParametersOverTemplate("/table/target-cluster-alpha.xml", overlay);
+ Assert.assertEquals(0, TestContext.executeWithURL("entity -submit -type cluster -file " + filePath));
+
+ filePath = targetContext.overlayParametersOverTemplate("/table/target-cluster-beta.xml", overlay);
+ Assert.assertEquals(0, TestContext.executeWithURL("entity -submit -type cluster -file " + filePath));
+
+ filePath = targetContext.overlayParametersOverTemplate("/table/target-cluster-gamma.xml", overlay);
+ Assert.assertEquals(0, TestContext.executeWithURL("entity -submit -type cluster -file " + filePath));
+
+ // verify if the partition on the source exists - precondition
+ FileSystem sourceFS = FileSystem.get(ClusterHelper.getConfiguration(sourceContext.getCluster().getCluster()));
+ Assert.assertTrue(sourceFS.exists(new Path(SOURCE_LOCATION + PARTITION_VALUE)));
+
+ filePath = sourceContext.overlayParametersOverTemplate("/table/multiple-targets-replicating-feed.xml", overlay);
+ Assert.assertEquals(0, TestContext.executeWithURL("entity -submitAndSchedule -type feed -file " + filePath));
+
+ // wait until the workflow job completes
+ final String feedName = "multiple-targets-replicating-feed";
+ WorkflowJob jobInfo = OozieTestUtils.getWorkflowJob(targetContext.getCluster().getCluster(),
+ OozieClient.FILTER_NAME + "=FALCON_FEED_REPLICATION_" + feedName);
+ Assert.assertEquals(jobInfo.getStatus(), WorkflowJob.Status.SUCCEEDED);
+
+ Assert.assertTrue(sourceFS.exists(new Path(SOURCE_LOCATION + PARTITION_VALUE)));
+
+ // verify if the partition on the target exists
+ FileSystem alpha = FileSystem.get(ClusterHelper.getConfiguration(targetAlphaContext.getCluster().getCluster()));
+ Assert.assertTrue(
+ alpha.exists(new Path("/falcon/test/target-cluster-alpha/customer_alpha/" + PARTITION_VALUE)));
+
+ FileSystem beta = FileSystem.get(ClusterHelper.getConfiguration(targetBetaContext.getCluster().getCluster()));
+ Assert.assertTrue(beta.exists(new Path("/falcon/test/target-cluster-beta/customer_beta/" + PARTITION_VALUE)));
+
+ FileSystem gamma = FileSystem.get(ClusterHelper.getConfiguration(targetAlphaContext.getCluster().getCluster()));
+ Assert.assertTrue(
+ gamma.exists(new Path("/falcon/test/target-cluster-gamma/customer_gamma/" + PARTITION_VALUE)));
+
+ InstancesResult response = targetContext.getService().path("api/instance/running/feed/" + feedName)
+ .header("Remote-User", "guest")
+ .accept(MediaType.APPLICATION_JSON)
+ .get(InstancesResult.class);
+ Assert.assertEquals(response.getStatus(), APIResult.Status.SUCCEEDED);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/502990c9/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedEvictorIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedEvictorIT.java b/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedEvictorIT.java
new file mode 100644
index 0000000..4d8ced0
--- /dev/null
+++ b/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedEvictorIT.java
@@ -0,0 +1,326 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.lifecycle;
+
+import org.apache.commons.el.ExpressionEvaluatorImpl;
+import org.apache.falcon.Pair;
+import org.apache.falcon.catalog.HiveCatalogService;
+import org.apache.falcon.entity.Storage;
+import org.apache.falcon.expression.ExpressionHelper;
+import org.apache.falcon.retention.FeedEvictor;
+import org.apache.falcon.util.HiveTestUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hcatalog.api.HCatAddPartitionDesc;
+import org.apache.hcatalog.api.HCatClient;
+import org.apache.hcatalog.api.HCatPartition;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import javax.servlet.jsp.el.ELException;
+import javax.servlet.jsp.el.ExpressionEvaluator;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+
+/**
+ * Test for FeedEvictor for table.
+ */
+public class TableStorageFeedEvictorIT {
+
+ private static final ExpressionEvaluator EVALUATOR = new ExpressionEvaluatorImpl();
+ private static final ExpressionHelper RESOLVER = ExpressionHelper.get();
+
+ private static final String METASTORE_URL = "thrift://localhost:49083/";
+ private static final String DATABASE_NAME = "falcon_db";
+ private static final String TABLE_NAME = "clicks";
+ private static final String EXTERNAL_TABLE_NAME = "clicks_external";
+ private static final String EXTERNAL_TABLE_LOCATION = "hdfs://localhost:41020/falcon/staging/clicks_external/";
+ private static final String FORMAT = "yyyyMMddHHmm";
+
+ private final InMemoryWriter stream = new InMemoryWriter(System.out);
+
+ private HCatClient client;
+
+ @BeforeClass
+ public void setUp() throws Exception {
+ FeedEvictor.OUT.set(stream);
+
+ client = HiveCatalogService.get(METASTORE_URL);
+
+ HiveTestUtils.createDatabase(METASTORE_URL, DATABASE_NAME);
+ final List<String> partitionKeys = Arrays.asList("ds", "region");
+ HiveTestUtils.createTable(METASTORE_URL, DATABASE_NAME, TABLE_NAME, partitionKeys);
+ HiveTestUtils.createExternalTable(METASTORE_URL, DATABASE_NAME, EXTERNAL_TABLE_NAME,
+ partitionKeys, EXTERNAL_TABLE_LOCATION);
+ }
+
+ @AfterClass
+ public void close() throws Exception {
+ HiveTestUtils.dropTable(METASTORE_URL, DATABASE_NAME, EXTERNAL_TABLE_NAME);
+ HiveTestUtils.dropTable(METASTORE_URL, DATABASE_NAME, TABLE_NAME);
+ HiveTestUtils.dropDatabase(METASTORE_URL, DATABASE_NAME);
+ }
+
+ @DataProvider (name = "retentionLimitDataProvider")
+ private Object[][] createRetentionLimitData() {
+ return new Object[][] {
+ {"days(10)", 4},
+ {"days(100)", 7},
+ };
+ }
+
+ @Test (dataProvider = "retentionLimitDataProvider")
+ public void testFeedEvictorForTable(String retentionLimit, int expectedSize) throws Exception {
+
+ final String timeZone = "UTC";
+ final String dateMask = "yyyyMMdd";
+
+ Pair<Date, Date> range = getDateRange(retentionLimit);
+ List<String> candidatePartitions = getCandidatePartitions("days(10)", dateMask, timeZone, 3);
+ addPartitions(TABLE_NAME, candidatePartitions, false);
+
+ try {
+ stream.clear();
+
+ final String tableUri = DATABASE_NAME + "/" + TABLE_NAME + "/ds=${YEAR}${MONTH}${DAY};region=us";
+ String feedBasePath = METASTORE_URL + tableUri;
+ String logFile = "hdfs://localhost:41020/falcon/staging/feed/instancePaths-2013-09-13-01-00.csv";
+
+ FeedEvictor.main(new String[]{
+ "-feedBasePath", feedBasePath,
+ "-retentionType", "instance",
+ "-retentionLimit", retentionLimit,
+ "-timeZone", timeZone,
+ "-frequency", "daily",
+ "-logFile", logFile,
+ "-falconFeedStorageType", Storage.TYPE.TABLE.name(),
+ });
+
+ List<HCatPartition> partitions = client.getPartitions(DATABASE_NAME, TABLE_NAME);
+ Assert.assertEquals(partitions.size(), expectedSize, "Unexpected number of partitions");
+
+ Assert.assertEquals(readLogFile(new Path(logFile)),
+ getExpectedInstancePaths(candidatePartitions, range.first, dateMask, timeZone));
+
+ } catch (Exception e) {
+ Assert.fail("Unknown exception", e);
+ } finally {
+ dropPartitions(TABLE_NAME, candidatePartitions);
+ }
+ }
+
+ @Test (dataProvider = "retentionLimitDataProvider")
+ public void testFeedEvictorForExternalTable(String retentionLimit, int expectedSize) throws Exception {
+
+ final String timeZone = "UTC";
+ final String dateMask = "yyyyMMdd";
+
+ Pair<Date, Date> range = getDateRange(retentionLimit);
+ List<String> candidatePartitions = getCandidatePartitions("days(10)", dateMask, timeZone, 3);
+ addPartitions(EXTERNAL_TABLE_NAME, candidatePartitions, true);
+
+ try {
+ stream.clear();
+
+ final String tableUri = DATABASE_NAME + "/" + EXTERNAL_TABLE_NAME + "/ds=${YEAR}${MONTH}${DAY};region=us";
+ String feedBasePath = METASTORE_URL + tableUri;
+ String logFile = "hdfs://localhost:41020/falcon/staging/feed/instancePaths-2013-09-13-01-00.csv";
+
+ FeedEvictor.main(new String[]{
+ "-feedBasePath", feedBasePath,
+ "-retentionType", "instance",
+ "-retentionLimit", retentionLimit,
+ "-timeZone", timeZone,
+ "-frequency", "daily",
+ "-logFile", logFile,
+ "-falconFeedStorageType", Storage.TYPE.TABLE.name(),
+ });
+
+ List<HCatPartition> partitions = client.getPartitions(DATABASE_NAME, EXTERNAL_TABLE_NAME);
+ Assert.assertEquals(partitions.size(), expectedSize, "Unexpected number of partitions");
+
+ Assert.assertEquals(readLogFile(new Path(logFile)),
+ getExpectedInstancePaths(candidatePartitions, range.first, dateMask, timeZone));
+
+ verifyFSPartitionsAreDeleted(candidatePartitions, range.first, dateMask, timeZone);
+
+ } catch (Exception e) {
+ Assert.fail("Unknown exception", e);
+ } finally {
+ dropPartitions(EXTERNAL_TABLE_NAME, candidatePartitions);
+ }
+ }
+
+ public List<String> getCandidatePartitions(String retentionLimit, String dateMask,
+ String timeZone, int limit) throws Exception {
+ List<String> partitions = new ArrayList<String>();
+
+ Pair<Date, Date> range = getDateRange(retentionLimit);
+
+ DateFormat dateFormat = new SimpleDateFormat(FORMAT.substring(0, dateMask.length()));
+ dateFormat.setTimeZone(TimeZone.getTimeZone(timeZone));
+
+ String startDate = dateFormat.format(range.first);
+ partitions.add(startDate);
+
+ Calendar calendar = Calendar.getInstance();
+ calendar.setTime(range.first);
+ for (int i = 1; i <= limit; i++) {
+ calendar.add(Calendar.DAY_OF_MONTH, -(i + 1));
+ partitions.add(dateFormat.format(calendar.getTime()));
+ }
+
+ calendar.setTime(range.second);
+ for (int i = 1; i <= limit; i++) {
+ calendar.add(Calendar.DAY_OF_MONTH, -(i + 1));
+ partitions.add(dateFormat.format(calendar.getTime()));
+ }
+
+ return partitions;
+ }
+
+ private Pair<Date, Date> getDateRange(String period) throws ELException {
+ Long duration = (Long) EVALUATOR.evaluate("${" + period + "}",
+ Long.class, RESOLVER, RESOLVER);
+ Date end = new Date();
+ Date start = new Date(end.getTime() - duration);
+ return Pair.of(start, end);
+ }
+
+ private void addPartitions(String tableName, List<String> candidatePartitions,
+ boolean isTableExternal) throws Exception {
+ Path path = new Path(EXTERNAL_TABLE_LOCATION);
+ FileSystem fs = path.getFileSystem(new Configuration());
+
+ for (String candidatePartition : candidatePartitions) {
+ if (isTableExternal) {
+ touch(fs, EXTERNAL_TABLE_LOCATION + candidatePartition);
+ }
+
+ Map<String, String> partition = new HashMap<String, String>();
+ partition.put("ds", candidatePartition); //yyyyMMDD
+ partition.put("region", "in");
+ HCatAddPartitionDesc addPtn = HCatAddPartitionDesc.create(
+ DATABASE_NAME, tableName, null, partition).build();
+ client.addPartition(addPtn);
+ }
+ }
+
+ private void touch(FileSystem fs, String path) throws Exception {
+ fs.create(new Path(path)).close();
+ }
+
+ private void dropPartitions(String tableName, List<String> candidatePartitions) throws Exception {
+
+ for (String candidatePartition : candidatePartitions) {
+ Map<String, String> partition = new HashMap<String, String>();
+ partition.put("ds", candidatePartition); //yyyyMMDD
+ partition.put("region", "in");
+ client.dropPartitions(DATABASE_NAME, tableName, partition, true);
+ }
+ }
+
+ public String getExpectedInstancePaths(List<String> candidatePartitions, Date date,
+ String dateMask, String timeZone) {
+ Collections.sort(candidatePartitions);
+ StringBuilder instances = new StringBuilder("instancePaths=");
+
+ DateFormat dateFormat = new SimpleDateFormat(FORMAT.substring(0, dateMask.length()));
+ dateFormat.setTimeZone(TimeZone.getTimeZone(timeZone));
+ String startDate = dateFormat.format(date);
+
+ for (String candidatePartition : candidatePartitions) {
+ if (candidatePartition.compareTo(startDate) < 0) {
+ instances.append("[")
+ .append(candidatePartition)
+ .append(", in],");
+ }
+ }
+
+ return instances.toString();
+ }
+
+ private void verifyFSPartitionsAreDeleted(List<String> candidatePartitions, Date date,
+ String dateMask, String timeZone) throws IOException {
+
+ FileSystem fs = new Path(EXTERNAL_TABLE_LOCATION).getFileSystem(new Configuration());
+
+ DateFormat dateFormat = new SimpleDateFormat(FORMAT.substring(0, dateMask.length()));
+ dateFormat.setTimeZone(TimeZone.getTimeZone(timeZone));
+ String startDate = dateFormat.format(date);
+
+ Collections.sort(candidatePartitions);
+ for (String candidatePartition : candidatePartitions) {
+ final String path = EXTERNAL_TABLE_LOCATION + "ds=" + candidatePartition + "/region=in";
+ if (candidatePartition.compareTo(startDate) < 0
+ && fs.exists(new Path(path))) {
+ Assert.fail("Expecting " + path + " to be deleted");
+ }
+ }
+ }
+
+ private String readLogFile(Path logFile) throws IOException {
+ ByteArrayOutputStream writer = new ByteArrayOutputStream();
+ InputStream date = logFile.getFileSystem(new Configuration()).open(logFile);
+ IOUtils.copyBytes(date, writer, 4096, true);
+ return writer.toString();
+ }
+
+ private static class InMemoryWriter extends PrintStream {
+
+ private final StringBuffer buffer = new StringBuffer();
+
+ public InMemoryWriter(OutputStream out) {
+ super(out);
+ }
+
+ @Override
+ public void println(String x) {
+ buffer.append(x);
+ super.println(x);
+ }
+
+ public String getBuffer() {
+ return buffer.toString();
+ }
+
+ public void clear() {
+ buffer.delete(0, buffer.length());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/502990c9/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedReplicationIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedReplicationIT.java b/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedReplicationIT.java
new file mode 100644
index 0000000..57bccf7
--- /dev/null
+++ b/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedReplicationIT.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.lifecycle;
+
+import org.apache.falcon.entity.ClusterHelper;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.cluster.Interfacetype;
+import org.apache.falcon.resource.APIResult;
+import org.apache.falcon.resource.InstancesResult;
+import org.apache.falcon.resource.TestContext;
+import org.apache.falcon.util.FSUtils;
+import org.apache.falcon.util.HiveTestUtils;
+import org.apache.falcon.util.OozieTestUtils;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hcatalog.api.HCatPartition;
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.client.WorkflowJob;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import javax.ws.rs.core.MediaType;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Integration tests for Feed Replication with Table storage.
+ *
+ * This test is disabled as it heavily depends on oozie sharelibs for
+ * hcatalog being made available on HDFS. captured in FALCON-139.
+ */
+@Test (enabled = false)
+public class TableStorageFeedReplicationIT {
+
+ private static final String SOURCE_DATABASE_NAME = "src_demo_db";
+ private static final String SOURCE_TABLE_NAME = "customer_raw";
+
+ private static final String TARGET_DATABASE_NAME = "tgt_demo_db";
+ private static final String TARGET_TABLE_NAME = "customer_bcp";
+
+ private static final String PARTITION_VALUE = "2013-10-24-00"; // ${YEAR}-${MONTH}-${DAY}-${HOUR}
+
+ private final TestContext sourceContext = new TestContext();
+ private String sourceMetastoreUrl;
+
+ private final TestContext targetContext = new TestContext();
+ private String targetMetastoreUrl;
+
+
+ @BeforeClass
+ public void setUp() throws Exception {
+ TestContext.cleanupStore();
+
+ Map<String, String> overlay = sourceContext.getUniqueOverlay();
+ String sourceFilePath = sourceContext.overlayParametersOverTemplate("/table/primary-cluster.xml", overlay);
+ sourceContext.setCluster(sourceFilePath);
+
+ final Cluster sourceCluster = sourceContext.getCluster().getCluster();
+ String sourceStorageUrl = ClusterHelper.getStorageUrl(sourceCluster);
+
+ // copyTestDataToHDFS
+ final String sourcePath = sourceStorageUrl + "/falcon/test/input/" + PARTITION_VALUE;
+ FSUtils.copyResourceToHDFS("/apps/data/data.txt", "data.txt", sourcePath);
+
+ sourceMetastoreUrl = ClusterHelper.getInterface(sourceCluster, Interfacetype.REGISTRY).getEndpoint();
+ setupHiveMetastore(sourceMetastoreUrl, SOURCE_DATABASE_NAME, SOURCE_TABLE_NAME);
+ HiveTestUtils.loadData(sourceMetastoreUrl, SOURCE_DATABASE_NAME, SOURCE_TABLE_NAME, sourcePath,
+ PARTITION_VALUE);
+
+ String targetFilePath = targetContext.overlayParametersOverTemplate("/table/bcp-cluster.xml", overlay);
+ targetContext.setCluster(targetFilePath);
+
+ final Cluster targetCluster = targetContext.getCluster().getCluster();
+ targetMetastoreUrl = ClusterHelper.getInterface(targetCluster, Interfacetype.REGISTRY).getEndpoint();
+ setupHiveMetastore(targetMetastoreUrl, TARGET_DATABASE_NAME, TARGET_TABLE_NAME);
+
+ copyLibsToHDFS(targetCluster);
+ }
+
+ private void setupHiveMetastore(String metastoreUrl, String databaseName,
+ String tableName) throws Exception {
+ cleanupHiveMetastore(metastoreUrl, databaseName, tableName);
+
+ HiveTestUtils.createDatabase(metastoreUrl, databaseName);
+ final List<String> partitionKeys = Arrays.asList("ds");
+ HiveTestUtils.createTable(metastoreUrl, databaseName, tableName, partitionKeys);
+ // todo this is a kludge to work around hive's limitations
+ HiveTestUtils.alterTable(metastoreUrl, databaseName, tableName);
+ }
+
+ private void copyLibsToHDFS(Cluster cluster) throws IOException {
+ // set up kahadb to be sent as part of workflows
+ StartupProperties.get().setProperty("libext.paths", "./target/libext");
+ String libext = ClusterHelper.getLocation(cluster, "working") + "/libext";
+ String targetStorageUrl = ClusterHelper.getStorageUrl(cluster);
+ FSUtils.copyOozieShareLibsToHDFS("./target/libext", targetStorageUrl + libext);
+ }
+
+ @AfterClass
+ public void tearDown() throws Exception {
+ TestContext.executeWithURL("entity -delete -type feed -name customer-table-replicating-feed");
+ TestContext.executeWithURL("entity -delete -type cluster -name primary-cluster");
+ TestContext.executeWithURL("entity -delete -type cluster -name bcp-cluster");
+
+ cleanupHiveMetastore(sourceMetastoreUrl, SOURCE_DATABASE_NAME, SOURCE_TABLE_NAME);
+ cleanupHiveMetastore(targetMetastoreUrl, TARGET_DATABASE_NAME, TARGET_TABLE_NAME);
+
+ cleanupStagingDirs(sourceContext.getCluster().getCluster(), SOURCE_DATABASE_NAME);
+ cleanupStagingDirs(targetContext.getCluster().getCluster(), TARGET_DATABASE_NAME);
+ }
+
+ private void cleanupHiveMetastore(String metastoreUrl, String databaseName, String tableName) throws Exception {
+ HiveTestUtils.dropTable(metastoreUrl, databaseName, tableName);
+ HiveTestUtils.dropDatabase(metastoreUrl, databaseName);
+ }
+
+ private void cleanupStagingDirs(Cluster cluster, String databaseName) throws IOException {
+ FileSystem fs = FileSystem.get(ClusterHelper.getConfiguration(cluster));
+ String stagingDir = "/apps/falcon/staging/"
+ + "FALCON_FEED_REPLICATION_customer-table-replicating-feed_primary-cluster/"
+ + databaseName;
+ fs.delete(new Path(stagingDir), true);
+ fs.delete(new Path("/falcon/test/input"), true);
+ }
+
+ @Test (enabled = false)
+ public void testTableReplication() throws Exception {
+ final String feedName = "customer-table-replicating-feed";
+ final Map<String, String> overlay = sourceContext.getUniqueOverlay();
+ String filePath = sourceContext.overlayParametersOverTemplate("/table/primary-cluster.xml", overlay);
+ Assert.assertEquals(0, TestContext.executeWithURL("entity -submit -type cluster -file " + filePath));
+
+ filePath = targetContext.overlayParametersOverTemplate("/table/bcp-cluster.xml", overlay);
+ Assert.assertEquals(0, TestContext.executeWithURL("entity -submit -type cluster -file " + filePath));
+
+ HCatPartition sourcePartition = HiveTestUtils.getPartition(
+ sourceMetastoreUrl, SOURCE_DATABASE_NAME, SOURCE_TABLE_NAME, "ds", PARTITION_VALUE);
+ Assert.assertNotNull(sourcePartition);
+
+ filePath = sourceContext.overlayParametersOverTemplate("/table/customer-table-replicating-feed.xml", overlay);
+ Assert.assertEquals(0, TestContext.executeWithURL("entity -submitAndSchedule -type feed -file " + filePath));
+
+ // wait until the workflow job completes
+ WorkflowJob jobInfo = OozieTestUtils.getWorkflowJob(targetContext.getCluster().getCluster(),
+ OozieClient.FILTER_NAME + "=FALCON_FEED_REPLICATION_" + feedName);
+ Assert.assertEquals(jobInfo.getStatus(), WorkflowJob.Status.SUCCEEDED);
+
+ // verify if the partition on the target exists
+ HCatPartition targetPartition = HiveTestUtils.getPartition(
+ targetMetastoreUrl, TARGET_DATABASE_NAME, TARGET_TABLE_NAME, "ds", PARTITION_VALUE);
+ Assert.assertNotNull(targetPartition);
+
+ InstancesResult response = targetContext.getService().path("api/instance/running/feed/" + feedName)
+ .header("Remote-User", "guest")
+ .accept(MediaType.APPLICATION_JSON)
+ .get(InstancesResult.class);
+ Assert.assertEquals(response.getStatus(), APIResult.Status.SUCCEEDED);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/502990c9/webapp/src/test/java/org/apache/falcon/process/PigProcessIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/process/PigProcessIT.java b/webapp/src/test/java/org/apache/falcon/process/PigProcessIT.java
new file mode 100644
index 0000000..58ae4ba
--- /dev/null
+++ b/webapp/src/test/java/org/apache/falcon/process/PigProcessIT.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.process;
+
+import org.apache.falcon.entity.ClusterHelper;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.resource.APIResult;
+import org.apache.falcon.resource.InstancesResult;
+import org.apache.falcon.resource.TestContext;
+import org.apache.falcon.util.FSUtils;
+import org.apache.falcon.util.OozieTestUtils;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.hadoop.fs.Path;
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.client.WorkflowJob;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import javax.ws.rs.core.MediaType;
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Integration tests for Pig Processing Engine.
+ *
+ * This test is disabled as it heavily depends on oozie sharelibs for
+ * pig and hcatalog being made available on HDFS. captured in FALCON-139.
+ */
+@Test (enabled = false)
+public class PigProcessIT {
+
+ private static final String CLUSTER_TEMPLATE = "/table/primary-cluster.xml";
+
+ private final TestContext context = new TestContext();
+ private Map<String, String> overlay;
+
+
+ @BeforeClass
+ public void prepare() throws Exception {
+ TestContext.prepare(CLUSTER_TEMPLATE);
+
+ overlay = context.getUniqueOverlay();
+
+ String filePath = context.overlayParametersOverTemplate(CLUSTER_TEMPLATE, overlay);
+ context.setCluster(filePath);
+
+ final Cluster cluster = context.getCluster().getCluster();
+ final String storageUrl = ClusterHelper.getStorageUrl(cluster);
+
+ copyDataAndScriptsToHDFS(storageUrl);
+ copyLibsToHDFS(cluster, storageUrl);
+ }
+
+ private void copyDataAndScriptsToHDFS(String storageUrl) throws IOException {
+ // copyPigScriptToHDFS
+ FSUtils.copyResourceToHDFS(
+ "/apps/pig/id.pig", "id.pig", storageUrl + "/falcon/test/apps/pig");
+ // copyTestDataToHDFS
+ FSUtils.copyResourceToHDFS(
+ "/apps/data/data.txt", "data.txt", storageUrl + "/falcon/test/input/2012/04/21/00");
+ }
+
+ private void copyLibsToHDFS(Cluster cluster, String storageUrl) throws IOException {
+ // set up kahadb to be sent as part of workflows
+ StartupProperties.get().setProperty("libext.paths", "./target/libext");
+ String libext = ClusterHelper.getLocation(cluster, "working") + "/libext";
+ FSUtils.copyOozieShareLibsToHDFS("./target/libext", storageUrl + libext);
+ }
+
+ @Test (enabled = false)
+ public void testSubmitAndSchedulePigProcess() throws Exception {
+ overlay.put("cluster", "primary-cluster");
+
+ String filePath = context.overlayParametersOverTemplate(CLUSTER_TEMPLATE, overlay);
+ Assert.assertEquals(0, TestContext.executeWithURL("entity -submit -type cluster -file " + filePath));
+ // context.setCluster(filePath);
+
+ filePath = context.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE1, overlay);
+ Assert.assertEquals(0,
+ TestContext.executeWithURL("entity -submit -type feed -file " + filePath));
+
+ filePath = context.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE2, overlay);
+ Assert.assertEquals(0,
+ TestContext.executeWithURL("entity -submit -type feed -file " + filePath));
+
+ final String pigProcessName = "pig-" + context.getProcessName();
+ overlay.put("processName", pigProcessName);
+
+ filePath = context.overlayParametersOverTemplate(TestContext.PIG_PROCESS_TEMPLATE, overlay);
+ Assert.assertEquals(0,
+ TestContext.executeWithURL("entity -submitAndSchedule -type process -file " + filePath));
+
+ WorkflowJob jobInfo = OozieTestUtils.getWorkflowJob(context.getCluster().getCluster(),
+ OozieClient.FILTER_NAME + "=FALCON_PROCESS_DEFAULT_" + pigProcessName);
+ Assert.assertEquals(WorkflowJob.Status.SUCCEEDED, jobInfo.getStatus());
+
+ InstancesResult response = context.getService().path("api/instance/running/process/" + pigProcessName)
+ .header("Remote-User", "guest")
+ .accept(MediaType.APPLICATION_JSON)
+ .get(InstancesResult.class);
+ Assert.assertEquals(APIResult.Status.SUCCEEDED, response.getStatus());
+
+ // verify LogMover
+ Path oozieLogPath = OozieTestUtils.getOozieLogPath(context.getCluster().getCluster(), jobInfo);
+ Assert.assertTrue(context.getCluster().getFileSystem().exists(oozieLogPath));
+
+ TestContext.executeWithURL("entity -delete -type process -name " + pigProcessName);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/502990c9/webapp/src/test/java/org/apache/falcon/process/TableStorageProcessIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/process/TableStorageProcessIT.java b/webapp/src/test/java/org/apache/falcon/process/TableStorageProcessIT.java
new file mode 100644
index 0000000..2d539c2
--- /dev/null
+++ b/webapp/src/test/java/org/apache/falcon/process/TableStorageProcessIT.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.process;
+
+import org.apache.falcon.entity.ClusterHelper;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.cluster.Interfacetype;
+import org.apache.falcon.resource.APIResult;
+import org.apache.falcon.resource.InstancesResult;
+import org.apache.falcon.resource.TestContext;
+import org.apache.falcon.util.FSUtils;
+import org.apache.falcon.util.HiveTestUtils;
+import org.apache.falcon.util.OozieTestUtils;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hcatalog.api.HCatPartition;
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.client.WorkflowJob;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import javax.ws.rs.core.MediaType;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Integration tests for Processing Engines, Pig & Hive with both FS and table storage.
+ *
+ * This test is disabled as it heavily depends on oozie sharelibs for
+ * pig and hcatalog being made available on HDFS. captured in FALCON-139.
+ */
+@Test (enabled = false)
+public class TableStorageProcessIT {
+
+ private static final String DATABASE_NAME = "falcon_db";
+ private static final String IN_TABLE_NAME = "input_table";
+ private static final String OUT_TABLE_NAME = "output_table";
+ private static final String PARTITION_VALUE = "2012-04-21-00"; // ${YEAR}-${MONTH}-${DAY}-${HOUR}
+ private static final String CLUSTER_TEMPLATE = "/table/primary-cluster.xml";
+
+ private final TestContext context = new TestContext();
+ private Map<String, String> overlay;
+ private String metastoreUrl;
+
+ @BeforeClass
+ public void prepare() throws Exception {
+ TestContext.prepare(CLUSTER_TEMPLATE);
+
+ overlay = context.getUniqueOverlay();
+ String filePath = context.overlayParametersOverTemplate(CLUSTER_TEMPLATE, overlay);
+ context.setCluster(filePath);
+
+ final Cluster cluster = context.getCluster().getCluster();
+ final String storageUrl = ClusterHelper.getStorageUrl(cluster);
+ metastoreUrl = ClusterHelper.getInterface(cluster, Interfacetype.REGISTRY).getEndpoint();
+
+ copyDataAndScriptsToHDFS(storageUrl);
+ copyLibsToHDFS(cluster, storageUrl);
+
+ setupHiveMetastore(storageUrl);
+ scheduleFeeds();
+ }
+
+ private void copyDataAndScriptsToHDFS(String storageUrl) throws IOException {
+ // copyTestDataToHDFS
+ FSUtils.copyResourceToHDFS(
+ "/apps/data/data.txt", "data.txt", storageUrl + "/falcon/test/input/" + PARTITION_VALUE);
+
+ // copyPigScriptToHDFS
+ FSUtils.copyResourceToHDFS(
+ "/apps/pig/table-id.pig", "table-id.pig", storageUrl + "/falcon/test/apps/pig");
+
+ // copyHiveScriptToHDFS
+ FSUtils.copyResourceToHDFS(
+ "/apps/hive/script.hql", "script.hql", storageUrl + "/falcon/test/apps/hive");
+ }
+
+ private void copyLibsToHDFS(Cluster cluster, String storageUrl) throws IOException {
+ // set up kahadb to be sent as part of workflows
+ StartupProperties.get().setProperty("libext.paths", "./target/libext");
+ String libext = ClusterHelper.getLocation(cluster, "working") + "/libext";
+ FSUtils.copyOozieShareLibsToHDFS("./target/libext", storageUrl + libext);
+ }
+
+ private void setupHiveMetastore(String storageUrl) throws Exception {
+ HiveTestUtils.createDatabase(metastoreUrl, DATABASE_NAME);
+ final List<String> partitionKeys = Arrays.asList("ds");
+ HiveTestUtils.createTable(metastoreUrl, DATABASE_NAME, IN_TABLE_NAME, partitionKeys);
+ final String sourcePath = storageUrl + "/falcon/test/input/" + PARTITION_VALUE;
+ HiveTestUtils.loadData(metastoreUrl, DATABASE_NAME, IN_TABLE_NAME, sourcePath, PARTITION_VALUE);
+
+ HiveTestUtils.createTable(metastoreUrl, DATABASE_NAME, OUT_TABLE_NAME, partitionKeys);
+ }
+
+ private void scheduleFeeds() throws Exception {
+ overlay.put("cluster", "primary-cluster");
+
+ String filePath = context.overlayParametersOverTemplate(CLUSTER_TEMPLATE, overlay);
+ Assert.assertEquals(0, TestContext.executeWithURL("entity -submit -type cluster -file " + filePath));
+
+ filePath = context.overlayParametersOverTemplate("/table/table-feed-input.xml", overlay);
+ Assert.assertEquals(0,
+ TestContext.executeWithURL("entity -submitAndSchedule -type feed -file " + filePath));
+
+ filePath = context.overlayParametersOverTemplate("/table/table-feed-output.xml", overlay);
+ Assert.assertEquals(0,
+ TestContext.executeWithURL("entity -submitAndSchedule -type feed -file " + filePath));
+ }
+
+ @AfterClass
+ public void tearDown() throws Exception {
+ HiveTestUtils.dropDatabase(metastoreUrl, DATABASE_NAME);
+
+ cleanupFS(context.getCluster().getCluster());
+
+ TestContext.executeWithURL("entity -delete -type feed -name output-table");
+ TestContext.executeWithURL("entity -delete -type feed -name input-table");
+ TestContext.executeWithURL("entity -delete -type cluster -name primary-cluster");
+ }
+
+ private void cleanupFS(Cluster cluster) throws IOException {
+ FileSystem fs = FileSystem.get(ClusterHelper.getConfiguration(cluster));
+ fs.delete(new Path("/falcon/test/input/" + PARTITION_VALUE), true);
+ fs.delete(new Path("/apps/data"), true);
+ fs.delete(new Path("/apps/pig"), true);
+ fs.delete(new Path("/apps/hive"), true);
+ }
+
+ @AfterMethod
+ private void cleanHiveMetastore() throws Exception {
+ // clean up the output table for next test
+ HiveTestUtils.dropTable(metastoreUrl, DATABASE_NAME, OUT_TABLE_NAME);
+ final List<String> partitionKeys = Arrays.asList("ds");
+ HiveTestUtils.createTable(metastoreUrl, DATABASE_NAME, OUT_TABLE_NAME, partitionKeys);
+ }
+
+ @Test (enabled = false)
+ public void testSubmitAndSchedulePigProcessForTableStorage() throws Exception {
+ final String pigProcessName = "pig-tables-" + context.getProcessName();
+ overlay.put("processName", pigProcessName);
+
+ String filePath = context.overlayParametersOverTemplate("/table/pig-process-tables.xml", overlay);
+ Assert.assertEquals(0,
+ TestContext.executeWithURL("entity -submitAndSchedule -type process -file " + filePath));
+
+ WorkflowJob jobInfo = OozieTestUtils.getWorkflowJob(context.getCluster().getCluster(),
+ OozieClient.FILTER_NAME + "=FALCON_PROCESS_DEFAULT_" + pigProcessName);
+ Assert.assertEquals(WorkflowJob.Status.SUCCEEDED, jobInfo.getStatus());
+
+ HCatPartition partition = HiveTestUtils.getPartition(
+ metastoreUrl, DATABASE_NAME, OUT_TABLE_NAME, "ds", PARTITION_VALUE);
+ Assert.assertTrue(partition != null);
+
+ InstancesResult response = context.getService().path("api/instance/running/process/" + pigProcessName)
+ .header("Remote-User", "guest")
+ .accept(MediaType.APPLICATION_JSON)
+ .get(InstancesResult.class);
+ Assert.assertEquals(APIResult.Status.SUCCEEDED, response.getStatus());
+
+ TestContext.executeWithURL("entity -delete -type process -name " + pigProcessName);
+ }
+
+
+ @Test (enabled = false)
+ public void testSubmitAndScheduleHiveProcess() throws Exception {
+ final String hiveProcessName = "hive-tables-" + context.getProcessName();
+ overlay.put("processName", hiveProcessName);
+
+ String filePath = context.overlayParametersOverTemplate("/table/hive-process-template.xml", overlay);
+ Assert.assertEquals(0,
+ TestContext.executeWithURL("entity -submitAndSchedule -type process -file " + filePath));
+
+ WorkflowJob jobInfo = OozieTestUtils.getWorkflowJob(context.getCluster().getCluster(),
+ OozieClient.FILTER_NAME + "=FALCON_PROCESS_DEFAULT_" + hiveProcessName);
+ Assert.assertEquals(WorkflowJob.Status.SUCCEEDED, jobInfo.getStatus());
+
+ HCatPartition partition = HiveTestUtils.getPartition(
+ metastoreUrl, DATABASE_NAME, OUT_TABLE_NAME, "ds", PARTITION_VALUE);
+ Assert.assertTrue(partition != null);
+
+ InstancesResult response = context.getService().path("api/instance/running/process/" + hiveProcessName)
+ .header("Remote-User", "guest")
+ .accept(MediaType.APPLICATION_JSON)
+ .get(InstancesResult.class);
+ Assert.assertEquals(APIResult.Status.SUCCEEDED, response.getStatus());
+
+ TestContext.executeWithURL("entity -delete -type process -name " + hiveProcessName);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/502990c9/webapp/src/test/java/org/apache/falcon/resource/ClusterEntityValidationIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/ClusterEntityValidationIT.java b/webapp/src/test/java/org/apache/falcon/resource/ClusterEntityValidationIT.java
deleted file mode 100644
index ee4d44e..0000000
--- a/webapp/src/test/java/org/apache/falcon/resource/ClusterEntityValidationIT.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.resource;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.InputStream;
-import java.util.Map;
-
-import org.apache.falcon.entity.ClusterHelper;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.entity.v0.cluster.Interface;
-import org.apache.falcon.entity.v0.cluster.Interfacetype;
-import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Test;
-
-import com.sun.jersey.api.client.ClientResponse;
-
-/**
- * Tests cluster entity validation to verify if each of the specified
- * interface endpoints are valid.
- */
-public class ClusterEntityValidationIT {
- private final TestContext context = new TestContext();
- private Map<String, String> overlay;
-
-
- @BeforeClass
- public void setup() throws Exception {
- TestContext.prepare();
- }
-
- /**
- * Positive test.
- *
- * @throws Exception
- */
- @Test
- public void testClusterEntityWithValidInterfaces() throws Exception {
- overlay = context.getUniqueOverlay();
- overlay.put("colo", "default");
- ClientResponse response = context.submitToFalcon(TestContext.CLUSTER_TEMPLATE, overlay, EntityType.CLUSTER);
- context.assertSuccessful(response);
- }
-
-
- @DataProvider(name = "interfaceToInvalidURLs")
- public Object[][] createInterfaceToInvalidURLData() {
- return new Object[][] {
- // TODO FileSystem validates invalid hftp url, does NOT fail
- // {Interfacetype.READONLY, "hftp://localhost:41119"},
- {Interfacetype.READONLY, ""},
- {Interfacetype.READONLY, "localhost:41119"},
- {Interfacetype.WRITE, "write-interface:9999"},
- {Interfacetype.WRITE, "hdfs://write-interface:9999"},
- {Interfacetype.EXECUTE, "execute-interface:9999"},
- {Interfacetype.WORKFLOW, "workflow-interface:9999/oozie/"},
- {Interfacetype.WORKFLOW, "http://workflow-interface:9999/oozie/"},
- {Interfacetype.MESSAGING, "messaging-interface:9999"},
- {Interfacetype.MESSAGING, "tcp://messaging-interface:9999"},
- {Interfacetype.REGISTRY, "catalog-interface:9999"},
- {Interfacetype.REGISTRY, "http://catalog-interface:9999"},
- {Interfacetype.REGISTRY, "Hcat"},
- };
- }
-
- @Test (dataProvider = "interfaceToInvalidURLs")
- public void testClusterEntityWithInvalidInterfaces(Interfacetype interfacetype, String endpoint)
- throws Exception {
- overlay = context.getUniqueOverlay();
- String filePath = context.overlayParametersOverTemplate(TestContext.CLUSTER_TEMPLATE, overlay);
- InputStream stream = new FileInputStream(filePath);
- Cluster cluster = (Cluster) EntityType.CLUSTER.getUnmarshaller().unmarshal(stream);
- Assert.assertNotNull(cluster);
- cluster.setColo("default"); // validations will be ignored if not default & tests fail
-
- Interface anInterface = ClusterHelper.getInterface(cluster, interfacetype);
- anInterface.setEndpoint(endpoint);
-
- File tmpFile = context.getTempFile();
- EntityType.CLUSTER.getMarshaller().marshal(cluster, tmpFile);
- ClientResponse response = context.submitFileToFalcon(EntityType.CLUSTER, tmpFile.getAbsolutePath());
- context.assertFailure(response);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/502990c9/webapp/src/test/java/org/apache/falcon/resource/FeedEntityValidationIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/FeedEntityValidationIT.java b/webapp/src/test/java/org/apache/falcon/resource/FeedEntityValidationIT.java
deleted file mode 100644
index 540691f..0000000
--- a/webapp/src/test/java/org/apache/falcon/resource/FeedEntityValidationIT.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.resource;
-
-import com.sun.jersey.api.client.ClientResponse;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.catalog.HiveCatalogService;
-import org.apache.falcon.entity.parser.EntityParserFactory;
-import org.apache.falcon.entity.parser.FeedEntityParser;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.Frequency;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.feed.LateArrival;
-import org.apache.hcatalog.api.HCatClient;
-import org.apache.hcatalog.api.HCatCreateDBDesc;
-import org.apache.hcatalog.api.HCatCreateTableDesc;
-import org.apache.hcatalog.data.schema.HCatFieldSchema;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Test;
-
-import javax.xml.bind.Marshaller;
-import java.io.FileInputStream;
-import java.io.InputStream;
-import java.io.StringWriter;
-import java.util.ArrayList;
-import java.util.Map;
-
-/**
- * Tests feed entity validation to verify if the table specified is valid.
- */
-public class FeedEntityValidationIT {
-
- private static final String METASTORE_URL = "thrift://localhost:49083";
- private static final String DATABASE_NAME = "falcondb";
- private static final String TABLE_NAME = "clicks";
- private static final String TABLE_URI =
- "catalog:" + DATABASE_NAME + ":" + TABLE_NAME + "#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}";
-
- private final TestContext context = new TestContext();
- private HCatClient client;
-
- @BeforeClass
- public void setup() throws Exception {
- TestContext.prepare();
-
- client = HiveCatalogService.get(METASTORE_URL);
-
- createDatabase();
- createTable();
- }
-
- private void createDatabase() throws Exception {
- HCatCreateDBDesc dbDesc = HCatCreateDBDesc.create(DATABASE_NAME)
- .ifNotExists(true).build();
- client.createDatabase(dbDesc);
- }
-
- public void createTable() throws Exception {
- ArrayList<HCatFieldSchema> cols = new ArrayList<HCatFieldSchema>();
- cols.add(new HCatFieldSchema("id", HCatFieldSchema.Type.INT, "id comment"));
- cols.add(new HCatFieldSchema("value", HCatFieldSchema.Type.STRING, "value comment"));
-
- HCatCreateTableDesc tableDesc = HCatCreateTableDesc
- .create(DATABASE_NAME, TABLE_NAME, cols)
- .fileFormat("rcfile")
- .ifNotExists(true)
- .comments("falcon integration test")
- .build();
- client.createTable(tableDesc);
- }
-
- @AfterClass
- public void tearDown() throws Exception {
- dropTable();
- dropDatabase();
- }
-
- private void dropTable() throws Exception {
- client.dropTable(DATABASE_NAME, TABLE_NAME, true);
- }
-
- private void dropDatabase() throws Exception {
- client.dropDatabase(DATABASE_NAME, true, HCatClient.DropDBMode.CASCADE);
- }
-
- /**
- * Positive test.
- *
- * @throws Exception
- */
- @Test
- public void testFeedEntityWithValidTable() throws Exception {
- Map<String, String> overlay = context.getUniqueOverlay();
- overlay.put("colo", "default");
-
- ClientResponse response = context.submitToFalcon(TestContext.CLUSTER_TEMPLATE, overlay, EntityType.CLUSTER);
- context.assertSuccessful(response);
-
- // submission will parse and validate the feed with table
- overlay.put("tableUri", TABLE_URI);
- response = context.submitToFalcon("/hive-table-feed.xml", overlay, EntityType.FEED);
- context.assertSuccessful(response);
- }
-
- /**
- * Late data handling test.
- *
- * @throws Exception
- */
- @Test (expectedExceptions = FalconException.class)
- public void testFeedEntityWithValidTableAndLateArrival() throws Exception {
- Map<String, String> overlay = context.getUniqueOverlay();
- overlay.put("colo", "default"); // validations will be ignored if not default & tests fail
- overlay.put("tableUri", TABLE_URI);
-
- String filePath = context.overlayParametersOverTemplate("/hive-table-feed.xml", overlay);
- InputStream stream = new FileInputStream(filePath);
- FeedEntityParser parser = (FeedEntityParser) EntityParserFactory.getParser(EntityType.FEED);
- Feed feed = parser.parse(stream);
- Assert.assertNotNull(feed);
-
- final LateArrival lateArrival = new LateArrival();
- lateArrival.setCutOff(new Frequency("4", Frequency.TimeUnit.hours));
- feed.setLateArrival(lateArrival);
-
- StringWriter stringWriter = new StringWriter();
- Marshaller marshaller = EntityType.FEED.getMarshaller();
- marshaller.marshal(feed, stringWriter);
- System.out.println(stringWriter.toString());
- parser.parseAndValidate(stringWriter.toString());
- }
-
- @DataProvider(name = "invalidTableUris")
- public Object[][] createInvalidTableUriData() {
- return new Object[][] {
- // does not match with group input's frequency
- {"catalog:" + DATABASE_NAME + ":" + TABLE_NAME + "#ds=ds=${YEAR}-${MONTH}-${DAY}", ""},
- {"catalog:" + DATABASE_NAME + ":" + TABLE_NAME + "#ds=ds=${YEAR}-${MONTH}-${DAY}", ""},
- {"badscheme:" + DATABASE_NAME + ":" + TABLE_NAME + "#ds=ds=${YEAR}-${MONTH}-${DAY}", ""},
- {"catalog:" + DATABASE_NAME + ":" + "badtable" + "#ds=ds=${YEAR}-${MONTH}-${DAY}", ""},
- {"catalog:" + "baddb" + ":" + TABLE_NAME + "#ds=ds=${YEAR}-${MONTH}-${DAY}", ""},
- {"catalog:" + "baddb" + ":" + "badtable" + "#ds=ds=${YEAR}-${MONTH}-${DAY}", ""},
- };
- }
-
- @Test (dataProvider = "invalidTableUris")
- public void testFeedEntityWithInvalidTableUri(String tableUri, String ignore)
- throws Exception {
-
- Map<String, String> overlay = context.getUniqueOverlay();
- overlay.put("colo", "default");
-
- ClientResponse response = context.submitToFalcon(TestContext.CLUSTER_TEMPLATE, overlay, EntityType.CLUSTER);
- context.assertSuccessful(response);
-
- // submission will parse and validate the feed with table
- overlay.put("tableUri", tableUri);
- response = context.submitToFalcon("/hive-table-feed.xml", overlay, EntityType.FEED);
- context.assertFailure(response);
- }
-}