You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sh...@apache.org on 2013/08/14 08:22:49 UTC
git commit: FALCON-72 Feeds with invalid oozie URI in cluster cannot
be deleted. Contributed by Venkatesh Seetharam
Updated Branches:
refs/heads/master 9aa2c63a7 -> 33a68b7a6
FALCON-72 Feeds with invalid oozie URI in cluster cannot be deleted. Contributed by Venkatesh Seetharam
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/33a68b7a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/33a68b7a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/33a68b7a
Branch: refs/heads/master
Commit: 33a68b7a6f1d0183acb102e825329fdb83c04bdd
Parents: 9aa2c63
Author: Shwetha GS <sh...@inmobi.com>
Authored: Wed Aug 14 11:56:46 2013 +0530
Committer: Shwetha GS <sh...@inmobi.com>
Committed: Wed Aug 14 11:56:46 2013 +0530
----------------------------------------------------------------------
CHANGES.txt | 3 +
common/pom.xml | 5 +
.../entity/parser/ClusterEntityParser.java | 107 +++++++++++++--
.../workflow/engine/AbstractWorkflowEngine.java | 3 +
.../entity/parser/ClusterEntityParserTest.java | 1 -
.../workflow/engine/OozieWorkflowEngine.java | 10 ++
.../resource/ClusterEntityValidationIT.java | 130 +++++++++++++++++++
7 files changed, 248 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/33a68b7a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 45dd9f9..9ec2b36 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -15,6 +15,9 @@ Trunk (Unreleased)
Srikanth Sundarrajan)
BUG FIXES
+ FALCON-72 Feeds with invalid oozie URI in cluster cannot be deleted.
+ (Venkatesh Seetharam vi Shwetha GS)
+
FALCON-78 Falcon error when prism on one hadoop version and server
on another. (Shwetha GS via Srikanth Sundarrajan)
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/33a68b7a/common/pom.xml
----------------------------------------------------------------------
diff --git a/common/pom.xml b/common/pom.xml
index 00d1f28..498320a 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -102,6 +102,11 @@
<groupId>net.sourceforge.findbugs</groupId>
<artifactId>annotations</artifactId>
</dependency>
+
+ <dependency>
+ <groupId>javax.jms</groupId>
+ <artifactId>jms</artifactId>
+ </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/33a68b7a/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
index 381d321..242fdfb 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
@@ -18,16 +18,25 @@
package org.apache.falcon.entity.parser;
+import org.apache.falcon.FalconException;
import org.apache.falcon.entity.ClusterHelper;
import org.apache.falcon.entity.store.StoreAccessException;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.cluster.Interfacetype;
import org.apache.falcon.util.DeploymentUtil;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.falcon.workflow.WorkflowEngineFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.log4j.Logger;
+import javax.jms.ConnectionFactory;
+import java.io.IOException;
+
/**
* Parser that parses cluster entity definition.
*/
@@ -42,26 +51,104 @@ public class ClusterEntityParser extends EntityParser<Cluster> {
@Override
public void validate(Cluster cluster) throws StoreAccessException,
ValidationException {
- if (new Path(ClusterHelper.getStorageUrl(cluster)).toUri().getScheme() == null) {
- throw new ValidationException(
- "Cannot get valid scheme for namenode from write interface of cluster: "
- + cluster.getName());
- }
+ // validating scheme in light of fail-early
+ validateScheme(cluster, Interfacetype.READONLY);
+ validateScheme(cluster, Interfacetype.WRITE);
+ validateScheme(cluster, Interfacetype.WORKFLOW);
+ validateScheme(cluster, Interfacetype.MESSAGING);
- //No filesystem validations in prism or other falcon servers. Only the falcon server for which
- // the cluster belongs to should check filesystem
+ // No interface validations in prism or other falcon servers.
+ // Only the falcon server for which the cluster belongs to should validate interfaces
if (DeploymentUtil.isPrism() || !cluster.getColo().equals(DeploymentUtil.getCurrentColo())) {
+ LOG.info("No interface validations in prism or falcon servers not applicable.");
return;
}
+ validateReadInterface(cluster);
+ validateWriteInterface(cluster);
+ validateExecuteInterface(cluster);
+ validateWorkflowInterface(cluster);
+ validateMessagingInterface(cluster);
+
+ // Interfacetype.REGISTRY is not validated as its not used
+ }
+
+ private void validateScheme(Cluster cluster, Interfacetype interfacetype)
+ throws ValidationException {
+ final String endpoint = ClusterHelper.getInterface(cluster, interfacetype).getEndpoint();
+ if (new Path(endpoint).toUri().getScheme() == null) {
+ throw new ValidationException("Cannot get valid scheme for interface: "
+ + interfacetype + " of cluster: " + cluster.getName());
+ }
+ }
+
+ private void validateReadInterface(Cluster cluster) throws ValidationException {
+ final String readOnlyStorageUrl = ClusterHelper.getReadOnlyStorageUrl(cluster);
+ LOG.info("Validating read interface: " + readOnlyStorageUrl);
+
+ validateFileSystem(readOnlyStorageUrl);
+ }
+
+ private void validateWriteInterface(Cluster cluster) throws ValidationException {
+ final String writeStorageUrl = ClusterHelper.getStorageUrl(cluster);
+ LOG.info("Validating write interface: " + writeStorageUrl);
+
+ validateFileSystem(writeStorageUrl);
+ }
+
+ private void validateFileSystem(String storageUrl) throws ValidationException {
try {
Configuration conf = new Configuration();
- conf.set("fs.default.name", ClusterHelper.getStorageUrl(cluster));
+ conf.set("fs.default.name", storageUrl);
conf.setInt("ipc.client.connect.max.retries", 10);
FileSystem.get(conf);
+ } catch (IOException e) {
+ throw new ValidationException("Invalid storage server or port: " + storageUrl, e);
+ }
+ }
+
+ private void validateExecuteInterface(Cluster cluster) throws ValidationException {
+ String executeUrl = ClusterHelper.getMREndPoint(cluster);
+ LOG.info("Validating execute interface: " + executeUrl);
+
+ try {
+ JobConf jobConf = new JobConf();
+ jobConf.set("mapred.job.tracker", executeUrl);
+ new JobClient(jobConf);
+ } catch (IOException e) {
+ throw new ValidationException("Invalid Execute server or port: " + executeUrl, e);
+ }
+ }
+
+ private void validateWorkflowInterface(Cluster cluster) throws ValidationException {
+ final String workflowUrl = ClusterHelper.getOozieUrl(cluster);
+ LOG.info("Validating workflow interface: " + workflowUrl);
+
+ try {
+ if (!WorkflowEngineFactory.getWorkflowEngine().isAlive(cluster)) {
+ throw new ValidationException("Unable to reach Workflow server:" + workflowUrl);
+ }
+ } catch (FalconException e) {
+ throw new ValidationException("Invalid Workflow server or port: " + workflowUrl, e);
+ }
+ }
+
+ private void validateMessagingInterface(Cluster cluster) throws ValidationException {
+ final String messagingUrl = ClusterHelper.getMessageBrokerUrl(cluster);
+ final String implementation = StartupProperties.get().getProperty(
+ "broker.impl.class", "org.apache.activemq.ActiveMQConnectionFactory");
+ LOG.info("Validating messaging interface: " + messagingUrl + ", implementation: " + implementation);
+
+ try {
+ @SuppressWarnings("unchecked")
+ Class<ConnectionFactory> clazz = (Class<ConnectionFactory>)
+ getClass().getClassLoader().loadClass(implementation);
+ ConnectionFactory connectionFactory = clazz.getConstructor(
+ String.class, String.class, String.class).newInstance("", "", messagingUrl);
+ connectionFactory.createConnection();
} catch (Exception e) {
- throw new ValidationException("Invalid HDFS server or port:"
- + ClusterHelper.getStorageUrl(cluster), e);
+ throw new ValidationException("Invalid Messaging server or port: " + messagingUrl
+ + " for: " + implementation, e);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/33a68b7a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
index f7526e4..17695d2 100644
--- a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
+++ b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
@@ -20,6 +20,7 @@ package org.apache.falcon.workflow.engine;
import org.apache.falcon.FalconException;
import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.resource.InstancesResult;
import java.util.Date;
@@ -42,6 +43,8 @@ public abstract class AbstractWorkflowEngine {
listeners.add(listener);
}
+ public abstract boolean isAlive(Cluster cluster) throws FalconException;
+
public abstract void schedule(Entity entity) throws FalconException;
public abstract String suspend(Entity entity) throws FalconException;
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/33a68b7a/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java b/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java
index 5c25ea4..f7d03e3 100644
--- a/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java
@@ -80,7 +80,6 @@ public class ClusterEntityParserTest extends AbstractTestBase {
Marshaller marshaller = EntityType.CLUSTER.getMarshaller();
marshaller.marshal(cluster, stringWriter);
System.out.println(stringWriter.toString());
- parser.parseAndValidate(stringWriter.toString());
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/33a68b7a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index 28a60c2..d9d4124 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -24,6 +24,7 @@ import org.apache.falcon.Tag;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.v0.*;
import org.apache.falcon.entity.v0.Frequency.TimeUnit;
+import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.resource.APIResult;
import org.apache.falcon.resource.InstancesResult;
import org.apache.falcon.resource.InstancesResult.Instance;
@@ -83,6 +84,15 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
}
@Override
+ public boolean isAlive(Cluster cluster) throws FalconException {
+ try {
+ return OozieClientFactory.get(cluster).getSystemMode() == OozieClient.SYSTEM_MODE.NORMAL;
+ } catch (OozieClientException e) {
+ throw new FalconException("Unable to reach Oozie server.", e);
+ }
+ }
+
+ @Override
public void schedule(Entity entity) throws FalconException {
Map<String, BundleJob> bundleMap = findLatestBundle(entity);
List<String> schedClusters = new ArrayList<String>();
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/33a68b7a/webapp/src/test/java/org/apache/falcon/resource/ClusterEntityValidationIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/ClusterEntityValidationIT.java b/webapp/src/test/java/org/apache/falcon/resource/ClusterEntityValidationIT.java
new file mode 100644
index 0000000..58e9c08
--- /dev/null
+++ b/webapp/src/test/java/org/apache/falcon/resource/ClusterEntityValidationIT.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.resource;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.falcon.entity.ClusterHelper;
+import org.apache.falcon.entity.parser.ClusterEntityParser;
+import org.apache.falcon.entity.parser.EntityParserFactory;
+import org.apache.falcon.entity.parser.ValidationException;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.cluster.Interface;
+import org.apache.falcon.entity.v0.cluster.Interfacetype;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import javax.xml.bind.Marshaller;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.io.StringWriter;
+import java.util.Map;
+
+/**
+ * Tests cluster entity validation to verify if each of the specified
+ * interface endpoints are valid.
+ */
+public class ClusterEntityValidationIT {
+ private final ClusterEntityParser parser =
+ (ClusterEntityParser) EntityParserFactory.getParser(EntityType.CLUSTER);
+
+ private final TestContext context = new TestContext();
+ private Map<String, String> overlay;
+
+
+ @BeforeClass
+ public void setup() throws Exception {
+ TestContext.prepare();
+ startBroker();
+
+ overlay = context.getUniqueOverlay();
+ }
+
+ private void startBroker() throws Exception {
+ BrokerService broker = new BrokerService();
+ broker.setUseJmx(false);
+ broker.setDataDirectory("target/data");
+ broker.addConnector("vm://localhost");
+ broker.addConnector("tcp://localhost:61616");
+ broker.start();
+ }
+
+ /**
+ * Positive test.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testClusterEntityWithValidInterfaces() throws Exception {
+
+ String filePath = context.overlayParametersOverTemplate(TestContext.CLUSTER_TEMPLATE, overlay);
+ InputStream stream = new FileInputStream(filePath);
+ Cluster cluster = parser.parse(stream);
+ Assert.assertNotNull(cluster);
+ cluster.setColo("default"); // validations will be ignored if not default & tests fail
+
+ StringWriter stringWriter = new StringWriter();
+ Marshaller marshaller = EntityType.CLUSTER.getMarshaller();
+ marshaller.marshal(cluster, stringWriter);
+ System.out.println(stringWriter.toString());
+ parser.parseAndValidate(stringWriter.toString());
+ }
+
+
+ @DataProvider(name = "interfaceToInvalidURLs")
+ public Object[][] createInterfaceToInvalidURLData() {
+ return new Object[][] {
+ // todo FileSystem validates invalid hftp url, does NOT fail
+ // {Interfacetype.READONLY, "hftp://localhost:41119"},
+ {Interfacetype.READONLY, ""},
+ {Interfacetype.READONLY, "localhost:41119"},
+ {Interfacetype.WRITE, "write-interface:9999"},
+ {Interfacetype.WRITE, "hdfs://write-interface:9999"},
+ {Interfacetype.EXECUTE, "execute-interface:9999"},
+ {Interfacetype.WORKFLOW, "workflow-interface:9999/oozie/"},
+ {Interfacetype.WORKFLOW, "http://workflow-interface:9999/oozie/"},
+ {Interfacetype.MESSAGING, "messaging-interface:9999"},
+ {Interfacetype.MESSAGING, "tcp://messaging-interface:9999"},
+ };
+ }
+
+ @Test (dataProvider = "interfaceToInvalidURLs",
+ expectedExceptions = {ValidationException.class, IllegalArgumentException.class})
+ public void testClusterEntityWithInvalidInterfaces(Interfacetype interfacetype, String endpoint)
+ throws Exception {
+ String filePath = context.overlayParametersOverTemplate(TestContext.CLUSTER_TEMPLATE, overlay);
+ InputStream stream = new FileInputStream(filePath);
+ Cluster cluster = parser.parse(stream);
+ Assert.assertNotNull(cluster);
+ cluster.setColo("default"); // validations will be ignored if not default & tests fail
+
+ Interface anInterface = ClusterHelper.getInterface(cluster, interfacetype);
+ anInterface.setEndpoint(endpoint);
+
+ StringWriter stringWriter = new StringWriter();
+ Marshaller marshaller = EntityType.CLUSTER.getMarshaller();
+ marshaller.marshal(cluster, stringWriter);
+ System.out.println(stringWriter.toString());
+ parser.parseAndValidate(stringWriter.toString());
+ Assert.fail("Validation exception must have been thrown for an invalid interface: "
+ + interfacetype + ", URL: " + endpoint);
+ }
+}