You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sr...@apache.org on 2013/11/11 05:49:22 UTC
[1/2] git commit: FALCON-180 Disable table replication for multiple
sources. Contributed by Venkatesh Seetharam FALCON-179 Table replication must
drop partition before import as late reruns fails. Contributed by Venkatesh
Seetharam FALCON-182 Disable spe
Updated Branches:
refs/heads/FALCON-85 b7e678bb1 -> 382781549
FALCON-180 Disable table replication for multiple sources. Contributed by Venkatesh Seetharam
FALCON-179 Table replication must drop partition before import as late reruns fails. Contributed by Venkatesh Seetharam
FALCON-182 Disable specifying partitions in inputs with table storage for process. Contributed by Venkatesh Seetharam
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/4fc7cd6c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/4fc7cd6c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/4fc7cd6c
Branch: refs/heads/FALCON-85
Commit: 4fc7cd6c7a1ec3e5ea3da50cb36fd10a24e449b7
Parents: 6f0731f
Author: srikanth.sundarrajan <sr...@inmobi.com>
Authored: Mon Nov 11 10:17:10 2013 +0530
Committer: srikanth.sundarrajan <sr...@inmobi.com>
Committed: Mon Nov 11 10:17:10 2013 +0530
----------------------------------------------------------------------
CHANGES.txt | 9 +++
.../falcon/entity/parser/FeedEntityParser.java | 43 +++++++++++--
.../entity/parser/ProcessEntityParser.java | 18 ++++--
.../entity/parser/FeedEntityParserTest.java | 22 +++++++
.../entity/parser/ProcessEntityParserTest.java | 14 +++++
.../feed/table-with-multiple-sources-feed.xml | 53 ++++++++++++++++
.../config/workflow/falcon-table-import.hql | 4 +-
.../TableStorageFeedReplicationIT.java | 65 ++++++++++++++++++--
8 files changed, 213 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/4fc7cd6c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ff57658..536459a 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -109,6 +109,15 @@ Trunk (Unreleased)
Srikanth Sundarrajan)
BUG FIXES
+ FALCON-180 Disable table replication for multiple sources. (Venkatesh
+ Seetharam via Srikanth Sundarrajan)
+
+ FALCON-179 Table replication must drop partition before import as
+ late reruns fails. (Venkatesh Seetharam via Srikanth Sundarrajan)
+
+ FALCON-182 Disable specifying partitions in inputs with table storage
+ for process. (Venkatesh Seetharam via Srikanth Sundarrajan)
+
FALCON-138 Remove perf4j dependency. (Jean-Baptiste Onofré via
Shwetha GS)
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/4fc7cd6c/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
index 0e687bd..8d7903b 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
@@ -161,9 +161,7 @@ public class FeedEntityParser extends EntityParser<Feed> {
CrossEntityValidations.validateFeedRetentionPeriod(input.getStart(), newFeed, clusterName);
CrossEntityValidations.validateInstanceRange(process, input, newFeed);
- if (input.getPartition() != null) {
- CrossEntityValidations.validateInputPartition(input, newFeed);
- }
+ validateInputPartition(newFeed, input);
}
}
@@ -181,6 +179,19 @@ public class FeedEntityParser extends EntityParser<Feed> {
}
}
+ private void validateInputPartition(Feed newFeed, Input input) throws FalconException {
+ if (input.getPartition() == null) {
+ return;
+ }
+
+ final Storage.TYPE baseFeedStorageType = FeedHelper.getStorageType(newFeed);
+ if (baseFeedStorageType == Storage.TYPE.FILESYSTEM) {
+ CrossEntityValidations.validateInputPartition(input, newFeed);
+ } else if (baseFeedStorageType == Storage.TYPE.TABLE) {
+ throw new ValidationException("Input partitions are not supported for table storage: " + input.getName());
+ }
+ }
+
private void validateClusterValidity(Date start, Date end, String clusterName) throws FalconException {
try {
if (start.after(end)) {
@@ -288,12 +299,32 @@ public class FeedEntityParser extends EntityParser<Feed> {
* Does not matter for FileSystem storage.
*/
private void validateFeedStorage(Feed feed) throws FalconException {
- final Storage.TYPE storageType = FeedHelper.getStorageType(feed);
- validateUniformStorageType(feed, storageType);
- validatePartitions(feed, storageType);
+ final Storage.TYPE baseFeedStorageType = FeedHelper.getStorageType(feed);
+ validateMultipleSourcesExist(feed, baseFeedStorageType);
+ validateUniformStorageType(feed, baseFeedStorageType);
+ validatePartitions(feed, baseFeedStorageType);
validateStorageExists(feed);
}
+ private void validateMultipleSourcesExist(Feed feed, Storage.TYPE baseFeedStorageType) throws FalconException {
+ if (baseFeedStorageType == Storage.TYPE.FILESYSTEM) {
+ return;
+ }
+
+ // validate that there is only one source cluster
+ int numberOfSourceClusters = 0;
+ for (Cluster cluster : feed.getClusters().getClusters()) {
+ if (cluster.getType() == ClusterType.SOURCE) {
+ numberOfSourceClusters++;
+ }
+ }
+
+ if (numberOfSourceClusters > 1) {
+ throw new ValidationException("Multiple sources are not supported for feed with table storage: "
+ + feed.getName());
+ }
+ }
+
private void validateUniformStorageType(Feed feed, Storage.TYPE feedStorageType) throws FalconException {
for (Cluster cluster : feed.getClusters().getClusters()) {
Storage.TYPE feedClusterStorageType = FeedHelper.getStorageType(feed, cluster);
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/4fc7cd6c/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
index 81bfe0f..8647d43 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
@@ -79,10 +79,7 @@ public class ProcessEntityParser extends EntityParser<Process> {
CrossEntityValidations.validateFeedDefinedForCluster(feed, clusterName);
CrossEntityValidations.validateFeedRetentionPeriod(input.getStart(), feed, clusterName);
CrossEntityValidations.validateInstanceRange(process, input, feed);
- if (input.getPartition() != null) {
- CrossEntityValidations.validateInputPartition(input, feed);
- }
-
+ validateInputPartition(input, feed);
validateOptionalInputsForTableStorage(feed, input);
}
}
@@ -155,6 +152,19 @@ public class ProcessEntityParser extends EntityParser<Process> {
}
}
+ private void validateInputPartition(Input input, Feed feed) throws FalconException {
+ if (input.getPartition() == null) {
+ return;
+ }
+
+ final Storage.TYPE baseFeedStorageType = FeedHelper.getStorageType(feed);
+ if (baseFeedStorageType == Storage.TYPE.FILESYSTEM) {
+ CrossEntityValidations.validateInputPartition(input, feed);
+ } else if (baseFeedStorageType == Storage.TYPE.TABLE) {
+ throw new ValidationException("Input partitions are not supported for table storage: " + input.getName());
+ }
+ }
+
private void validateDatasetName(Inputs inputs, Outputs outputs) throws ValidationException {
Set<String> datasetNames = new HashSet<String>();
if (inputs != null) {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/4fc7cd6c/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java b/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
index 66bdf5c..b90713e 100644
--- a/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
@@ -458,4 +458,26 @@ public class FeedEntityParserTest extends AbstractTestBase {
public void testParseInvalidFeedWithTable() throws FalconException {
parser.parse(FeedEntityParserTest.class.getResourceAsStream("/config/feed/invalid-feed.xml"));
}
+
+ @Test (expectedExceptions = FalconException.class)
+ public void testValidateFeedWithTableAndMultipleSources() throws FalconException {
+ parser.parseAndValidate(FeedEntityParserTest.class.getResourceAsStream(
+ "/config/feed/table-with-multiple-sources-feed.xml"));
+ Assert.fail("Should have thrown an exception:Multiple sources are not supported for feed with table storage");
+ }
+
+ @Test(expectedExceptions = ValidationException.class)
+ public void testValidatePartitionsForTable() throws Exception {
+ Feed feed = parser.parse(FeedEntityParserTest.class.getResourceAsStream("/config/feed/hive-table-feed.xml"));
+ Assert.assertNull(feed.getPartitions());
+
+ Partitions partitions = new Partitions();
+ Partition partition = new Partition();
+ partition.setName("colo");
+ partitions.getPartitions().add(partition);
+ feed.setPartitions(partitions);
+
+ parser.validate(feed);
+ Assert.fail("An exception should have been thrown:Partitions are not supported for feeds with table storage");
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/4fc7cd6c/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java b/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java
index 4537bb3..e656772 100644
--- a/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java
@@ -315,4 +315,18 @@ public class ProcessEntityParserTest extends AbstractTestBase {
Assert.assertTrue(e instanceof ValidationException);
}
}
+
+ @Test(expectedExceptions = ValidationException.class)
+ public void testValidateInputPartitionForTable() throws Exception {
+ Process process = parser.parse(
+ ProcessEntityParserTest.class.getResourceAsStream("/config/process/process-table.xml"));
+ if (process.getInputs() != null) {
+ for (Input input : process.getInputs().getInputs()) {
+ input.setPartition("region=usa");
+ }
+ }
+
+ parser.validate(process);
+ Assert.fail("An exception should have been thrown since Input partitions are not supported for table storage");
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/4fc7cd6c/common/src/test/resources/config/feed/table-with-multiple-sources-feed.xml
----------------------------------------------------------------------
diff --git a/common/src/test/resources/config/feed/table-with-multiple-sources-feed.xml b/common/src/test/resources/config/feed/table-with-multiple-sources-feed.xml
new file mode 100644
index 0000000..f84f3d4
--- /dev/null
+++ b/common/src/test/resources/config/feed/table-with-multiple-sources-feed.xml
@@ -0,0 +1,53 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+<feed description="clicks log" name="clicks" xmlns="uri:falcon:feed:0.1">
+ <partitions>
+ <partition name="fraud"/>
+ <partition name="good"/>
+ </partitions>
+
+ <groups>online,bi</groups>
+
+ <frequency>hours(1)</frequency>
+ <timezone>UTC</timezone>
+ <late-arrival cut-off="hours(6)"/>
+
+ <clusters>
+ <cluster name="testCluster" type="source">
+ <validity start="2021-11-01T00:00Z" end="2021-12-31T00:00Z"/>
+ <retention limit="hours(48)" action="delete"/>
+ <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+ </cluster>
+ <cluster name="testCluster" type="source">
+ <validity start="2021-11-01T00:00Z" end="2021-12-31T00:00Z"/>
+ <retention limit="hours(48)" action="delete"/>
+ <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+ </cluster>
+ <cluster name="backupCluster" type="target">
+ <validity start="2011-11-01T00:00Z" end="2011-12-31T00:00Z"/>
+ <retention limit="hours(6)" action="archive"/>
+ <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+ </cluster>
+ </clusters>
+
+ <table uri="catalog:default:clicks#ds=$YEAR-$MONTH-$DAY-$HOUR" />
+
+ <ACL owner="testuser" group="group" permission="0x755"/>
+ <schema location="/schema/clicks" provider="protobuf"/>
+</feed>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/4fc7cd6c/feed/src/main/resources/config/workflow/falcon-table-import.hql
----------------------------------------------------------------------
diff --git a/feed/src/main/resources/config/workflow/falcon-table-import.hql b/feed/src/main/resources/config/workflow/falcon-table-import.hql
index 917b5b9..653d580 100644
--- a/feed/src/main/resources/config/workflow/falcon-table-import.hql
+++ b/feed/src/main/resources/config/workflow/falcon-table-import.hql
@@ -15,4 +15,6 @@
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
-import table ${falconTargetDatabase}.${falconTargetTable} partition ${falconTargetPartition} from '${falconTargetStagingDir}';
+use ${falconTargetDatabase};
+alter table ${falconTargetTable} drop if exists partition ${falconTargetPartition};
+import table ${falconTargetTable} partition ${falconTargetPartition} from '${falconTargetStagingDir}';
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/4fc7cd6c/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedReplicationIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedReplicationIT.java b/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedReplicationIT.java
index 256d3b5..dbc6442 100644
--- a/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedReplicationIT.java
+++ b/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedReplicationIT.java
@@ -119,10 +119,6 @@ public class TableStorageFeedReplicationIT {
@AfterClass
public void tearDown() throws Exception {
- TestContext.executeWithURL("entity -delete -type feed -name customer-table-replicating-feed");
- TestContext.executeWithURL("entity -delete -type cluster -name primary-cluster");
- TestContext.executeWithURL("entity -delete -type cluster -name bcp-cluster");
-
cleanupHiveMetastore(sourceMetastoreUrl, SOURCE_DATABASE_NAME, SOURCE_TABLE_NAME);
cleanupHiveMetastore(targetMetastoreUrl, TARGET_DATABASE_NAME, TARGET_TABLE_NAME);
@@ -176,5 +172,66 @@ public class TableStorageFeedReplicationIT {
.accept(MediaType.APPLICATION_JSON)
.get(InstancesResult.class);
Assert.assertEquals(response.getStatus(), APIResult.Status.SUCCEEDED);
+
+ TestContext.executeWithURL("entity -delete -type feed -name customer-table-replicating-feed");
+ TestContext.executeWithURL("entity -delete -type cluster -name primary-cluster");
+ TestContext.executeWithURL("entity -delete -type cluster -name bcp-cluster");
+ }
+
+ @Test (enabled = false)
+ public void testTableReplicationWithExistingTargetPartition() throws Exception {
+ final String feedName = "customer-table-replicating-feed";
+ final Map<String, String> overlay = sourceContext.getUniqueOverlay();
+ String filePath = sourceContext.overlayParametersOverTemplate("/table/primary-cluster.xml", overlay);
+ Assert.assertEquals(0, TestContext.executeWithURL("entity -submit -type cluster -file " + filePath));
+
+ filePath = targetContext.overlayParametersOverTemplate("/table/bcp-cluster.xml", overlay);
+ Assert.assertEquals(0, TestContext.executeWithURL("entity -submit -type cluster -file " + filePath));
+
+ HCatPartition sourcePartition = HiveTestUtils.getPartition(
+ sourceMetastoreUrl, SOURCE_DATABASE_NAME, SOURCE_TABLE_NAME, "ds", PARTITION_VALUE);
+ Assert.assertNotNull(sourcePartition);
+
+ addPartitionToTarget();
+ // verify if the partition on the target exists before replication starts
+ // to see import drops partition before importing partition
+ HCatPartition targetPartition = HiveTestUtils.getPartition(
+ targetMetastoreUrl, TARGET_DATABASE_NAME, TARGET_TABLE_NAME, "ds", PARTITION_VALUE);
+ Assert.assertNotNull(targetPartition);
+
+ filePath = sourceContext.overlayParametersOverTemplate("/table/customer-table-replicating-feed.xml", overlay);
+ Assert.assertEquals(0, TestContext.executeWithURL("entity -submitAndSchedule -type feed -file " + filePath));
+
+ // wait until the workflow job completes
+ WorkflowJob jobInfo = OozieTestUtils.getWorkflowJob(targetContext.getCluster().getCluster(),
+ OozieClient.FILTER_NAME + "=FALCON_FEED_REPLICATION_" + feedName);
+ Assert.assertEquals(jobInfo.getStatus(), WorkflowJob.Status.SUCCEEDED);
+
+ // verify if the partition on the target exists
+ targetPartition = HiveTestUtils.getPartition(
+ targetMetastoreUrl, TARGET_DATABASE_NAME, TARGET_TABLE_NAME, "ds", PARTITION_VALUE);
+ Assert.assertNotNull(targetPartition);
+
+ InstancesResult response = targetContext.getService().path("api/instance/running/feed/" + feedName)
+ .header("Remote-User", "guest")
+ .accept(MediaType.APPLICATION_JSON)
+ .get(InstancesResult.class);
+ Assert.assertEquals(response.getStatus(), APIResult.Status.SUCCEEDED);
+
+ TestContext.executeWithURL("entity -delete -type feed -name customer-table-replicating-feed");
+ TestContext.executeWithURL("entity -delete -type cluster -name primary-cluster");
+ TestContext.executeWithURL("entity -delete -type cluster -name bcp-cluster");
+ }
+
+ private void addPartitionToTarget() throws Exception {
+ final Cluster targetCluster = targetContext.getCluster().getCluster();
+ String targetStorageUrl = ClusterHelper.getStorageUrl(targetCluster);
+
+ // copyTestDataToHDFS
+ final String targetPath = targetStorageUrl + "/falcon/test/input/" + PARTITION_VALUE;
+ FSUtils.copyResourceToHDFS("/apps/data/data.txt", "data.txt", targetPath);
+
+ HiveTestUtils.loadData(targetMetastoreUrl, TARGET_DATABASE_NAME, TARGET_TABLE_NAME,
+ targetPath, PARTITION_VALUE);
}
}
[2/2] git commit: Merge branch 'FALCON-85' of
https://git-wip-us.apache.org/repos/asf/incubator-falcon into FALCON-85
Posted by sr...@apache.org.
Merge branch 'FALCON-85' of https://git-wip-us.apache.org/repos/asf/incubator-falcon into FALCON-85
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/38278154
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/38278154
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/38278154
Branch: refs/heads/FALCON-85
Commit: 38278154998073f8cb24ef9db0716478275c2db9
Parents: 4fc7cd6 b7e678b
Author: srikanth.sundarrajan <sr...@inmobi.com>
Authored: Mon Nov 11 10:18:51 2013 +0530
Committer: srikanth.sundarrajan <sr...@inmobi.com>
Committed: Mon Nov 11 10:18:51 2013 +0530
----------------------------------------------------------------------
CHANGES.txt | 6 +
.../apache/falcon/entity/FileSystemStorage.java | 7 +-
.../falcon/entity/FileSystemStorageTest.java | 24 +++-
.../falcon/converter/OozieFeedMapper.java | 14 ++-
.../falcon/converter/OozieFeedMapperTest.java | 126 +++++++++++++++++--
feed/src/test/resources/fs-replication-feed.xml | 64 ++++++++++
feed/src/test/resources/trg-cluster-alpha.xml | 39 ++++++
feed/src/test/resources/trg-cluster-beta.xml | 39 ++++++
.../lifecycle/FileSystemFeedReplicationIT.java | 67 +++++++++-
.../table/complex-replicating-feed.xml | 71 +++++++++++
.../resources/table/target-cluster-alpha.xml | 2 +-
.../resources/table/target-cluster-beta.xml | 2 +-
12 files changed, 435 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/38278154/CHANGES.txt
----------------------------------------------------------------------