You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sh...@apache.org on 2013/08/14 08:22:49 UTC

git commit: FALCON-72 Feeds with invalid oozie URI in cluster cannot be deleted. Contributed by Venkatesh Seetharam

Updated Branches:
  refs/heads/master 9aa2c63a7 -> 33a68b7a6


FALCON-72 Feeds with invalid oozie URI in cluster cannot be deleted. Contributed by Venkatesh Seetharam


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/33a68b7a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/33a68b7a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/33a68b7a

Branch: refs/heads/master
Commit: 33a68b7a6f1d0183acb102e825329fdb83c04bdd
Parents: 9aa2c63
Author: Shwetha GS <sh...@inmobi.com>
Authored: Wed Aug 14 11:56:46 2013 +0530
Committer: Shwetha GS <sh...@inmobi.com>
Committed: Wed Aug 14 11:56:46 2013 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +
 common/pom.xml                                  |   5 +
 .../entity/parser/ClusterEntityParser.java      | 107 +++++++++++++--
 .../workflow/engine/AbstractWorkflowEngine.java |   3 +
 .../entity/parser/ClusterEntityParserTest.java  |   1 -
 .../workflow/engine/OozieWorkflowEngine.java    |  10 ++
 .../resource/ClusterEntityValidationIT.java     | 130 +++++++++++++++++++
 7 files changed, 248 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/33a68b7a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 45dd9f9..9ec2b36 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -15,6 +15,9 @@ Trunk (Unreleased)
     Srikanth Sundarrajan)
 
   BUG FIXES
+    FALCON-72 Feeds with invalid oozie URI in cluster cannot be deleted.
+    (Venkatesh Seetharam vi Shwetha GS)
+
     FALCON-78 Falcon error when prism on one hadoop version and server 
     on another. (Shwetha GS via Srikanth Sundarrajan)
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/33a68b7a/common/pom.xml
----------------------------------------------------------------------
diff --git a/common/pom.xml b/common/pom.xml
index 00d1f28..498320a 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -102,6 +102,11 @@
             <groupId>net.sourceforge.findbugs</groupId>
             <artifactId>annotations</artifactId>
         </dependency>
+
+        <dependency>
+            <groupId>javax.jms</groupId>
+            <artifactId>jms</artifactId>
+        </dependency>
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/33a68b7a/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
index 381d321..242fdfb 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
@@ -18,16 +18,25 @@
 
 package org.apache.falcon.entity.parser;
 
+import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.store.StoreAccessException;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.cluster.Interfacetype;
 import org.apache.falcon.util.DeploymentUtil;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.falcon.workflow.WorkflowEngineFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.log4j.Logger;
 
+import javax.jms.ConnectionFactory;
+import java.io.IOException;
+
 /**
  * Parser that parses cluster entity definition.
  */
@@ -42,26 +51,104 @@ public class ClusterEntityParser extends EntityParser<Cluster> {
     @Override
     public void validate(Cluster cluster) throws StoreAccessException,
                                                  ValidationException {
-        if (new Path(ClusterHelper.getStorageUrl(cluster)).toUri().getScheme() == null) {
-            throw new ValidationException(
-                    "Cannot get valid scheme for namenode from write interface of cluster: "
-                            + cluster.getName());
-        }
+        // validating scheme in light of fail-early
+        validateScheme(cluster, Interfacetype.READONLY);
+        validateScheme(cluster, Interfacetype.WRITE);
+        validateScheme(cluster, Interfacetype.WORKFLOW);
+        validateScheme(cluster, Interfacetype.MESSAGING);
 
-        //No filesystem validations in prism or other falcon servers. Only the falcon server for which
-        // the cluster belongs to should check filesystem
+        // No interface validations in prism or other falcon servers.
+        // Only the falcon server for which the cluster belongs to should validate interfaces
         if (DeploymentUtil.isPrism() || !cluster.getColo().equals(DeploymentUtil.getCurrentColo())) {
+            LOG.info("No interface validations in prism or falcon servers not applicable.");
             return;
         }
 
+        validateReadInterface(cluster);
+        validateWriteInterface(cluster);
+        validateExecuteInterface(cluster);
+        validateWorkflowInterface(cluster);
+        validateMessagingInterface(cluster);
+
+        // Interfacetype.REGISTRY is not validated as its not used
+    }
+
+    private void validateScheme(Cluster cluster, Interfacetype interfacetype)
+        throws ValidationException {
+        final String endpoint = ClusterHelper.getInterface(cluster, interfacetype).getEndpoint();
+        if (new Path(endpoint).toUri().getScheme() == null) {
+            throw new ValidationException("Cannot get valid scheme for interface: "
+                    + interfacetype + " of cluster: " + cluster.getName());
+        }
+    }
+
+    private void validateReadInterface(Cluster cluster) throws ValidationException {
+        final String readOnlyStorageUrl = ClusterHelper.getReadOnlyStorageUrl(cluster);
+        LOG.info("Validating read interface: " + readOnlyStorageUrl);
+
+        validateFileSystem(readOnlyStorageUrl);
+    }
+
+    private void validateWriteInterface(Cluster cluster) throws ValidationException {
+        final String writeStorageUrl = ClusterHelper.getStorageUrl(cluster);
+        LOG.info("Validating write interface: " + writeStorageUrl);
+
+        validateFileSystem(writeStorageUrl);
+    }
+
+    private void validateFileSystem(String storageUrl) throws ValidationException {
         try {
             Configuration conf = new Configuration();
-            conf.set("fs.default.name", ClusterHelper.getStorageUrl(cluster));
+            conf.set("fs.default.name", storageUrl);
             conf.setInt("ipc.client.connect.max.retries", 10);
             FileSystem.get(conf);
+        } catch (IOException e) {
+            throw new ValidationException("Invalid storage server or port: " + storageUrl, e);
+        }
+    }
+
+    private void validateExecuteInterface(Cluster cluster) throws ValidationException {
+        String executeUrl = ClusterHelper.getMREndPoint(cluster);
+        LOG.info("Validating execute interface: " + executeUrl);
+
+        try {
+            JobConf jobConf = new JobConf();
+            jobConf.set("mapred.job.tracker", executeUrl);
+            new JobClient(jobConf);
+        } catch (IOException e) {
+            throw new ValidationException("Invalid Execute server or port: " + executeUrl, e);
+        }
+    }
+
+    private void validateWorkflowInterface(Cluster cluster) throws ValidationException {
+        final String workflowUrl = ClusterHelper.getOozieUrl(cluster);
+        LOG.info("Validating workflow interface: " + workflowUrl);
+
+        try {
+            if (!WorkflowEngineFactory.getWorkflowEngine().isAlive(cluster)) {
+                throw new ValidationException("Unable to reach Workflow server:" + workflowUrl);
+            }
+        } catch (FalconException e) {
+            throw new ValidationException("Invalid Workflow server or port: " + workflowUrl, e);
+        }
+    }
+
+    private void validateMessagingInterface(Cluster cluster) throws ValidationException {
+        final String messagingUrl = ClusterHelper.getMessageBrokerUrl(cluster);
+        final String implementation = StartupProperties.get().getProperty(
+                "broker.impl.class", "org.apache.activemq.ActiveMQConnectionFactory");
+        LOG.info("Validating messaging interface: " + messagingUrl + ", implementation: " + implementation);
+
+        try {
+            @SuppressWarnings("unchecked")
+            Class<ConnectionFactory> clazz = (Class<ConnectionFactory>)
+                    getClass().getClassLoader().loadClass(implementation);
+            ConnectionFactory connectionFactory = clazz.getConstructor(
+                    String.class, String.class, String.class).newInstance("", "", messagingUrl);
+            connectionFactory.createConnection();
         } catch (Exception e) {
-            throw new ValidationException("Invalid HDFS server or port:"
-                    + ClusterHelper.getStorageUrl(cluster), e);
+            throw new ValidationException("Invalid Messaging server or port: " + messagingUrl
+                    + " for: " + implementation, e);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/33a68b7a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
index f7526e4..17695d2 100644
--- a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
+++ b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
@@ -20,6 +20,7 @@ package org.apache.falcon.workflow.engine;
 
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.resource.InstancesResult;
 
 import java.util.Date;
@@ -42,6 +43,8 @@ public abstract class AbstractWorkflowEngine {
         listeners.add(listener);
     }
 
+    public abstract boolean isAlive(Cluster cluster) throws FalconException;
+
     public abstract void schedule(Entity entity) throws FalconException;
 
     public abstract String suspend(Entity entity) throws FalconException;

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/33a68b7a/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java b/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java
index 5c25ea4..f7d03e3 100644
--- a/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java
@@ -80,7 +80,6 @@ public class ClusterEntityParserTest extends AbstractTestBase {
         Marshaller marshaller = EntityType.CLUSTER.getMarshaller();
         marshaller.marshal(cluster, stringWriter);
         System.out.println(stringWriter.toString());
-        parser.parseAndValidate(stringWriter.toString());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/33a68b7a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index 28a60c2..d9d4124 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -24,6 +24,7 @@ import org.apache.falcon.Tag;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.v0.*;
 import org.apache.falcon.entity.v0.Frequency.TimeUnit;
+import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.resource.APIResult;
 import org.apache.falcon.resource.InstancesResult;
 import org.apache.falcon.resource.InstancesResult.Instance;
@@ -83,6 +84,15 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
     }
 
     @Override
+    public boolean isAlive(Cluster cluster) throws FalconException {
+        try {
+            return OozieClientFactory.get(cluster).getSystemMode() == OozieClient.SYSTEM_MODE.NORMAL;
+        } catch (OozieClientException e) {
+            throw new FalconException("Unable to reach Oozie server.", e);
+        }
+    }
+
+    @Override
     public void schedule(Entity entity) throws FalconException {
         Map<String, BundleJob> bundleMap = findLatestBundle(entity);
         List<String> schedClusters = new ArrayList<String>();

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/33a68b7a/webapp/src/test/java/org/apache/falcon/resource/ClusterEntityValidationIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/ClusterEntityValidationIT.java b/webapp/src/test/java/org/apache/falcon/resource/ClusterEntityValidationIT.java
new file mode 100644
index 0000000..58e9c08
--- /dev/null
+++ b/webapp/src/test/java/org/apache/falcon/resource/ClusterEntityValidationIT.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.resource;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.falcon.entity.ClusterHelper;
+import org.apache.falcon.entity.parser.ClusterEntityParser;
+import org.apache.falcon.entity.parser.EntityParserFactory;
+import org.apache.falcon.entity.parser.ValidationException;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.cluster.Interface;
+import org.apache.falcon.entity.v0.cluster.Interfacetype;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import javax.xml.bind.Marshaller;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.io.StringWriter;
+import java.util.Map;
+
+/**
+ * Tests cluster entity validation to verify if each of the specified
+ * interface endpoints are valid.
+ */
+public class ClusterEntityValidationIT {
+    private final ClusterEntityParser parser =
+            (ClusterEntityParser) EntityParserFactory.getParser(EntityType.CLUSTER);
+
+    private final TestContext context = new TestContext();
+    private Map<String, String> overlay;
+
+
+    @BeforeClass
+    public void setup() throws Exception {
+        TestContext.prepare();
+        startBroker();
+
+        overlay = context.getUniqueOverlay();
+    }
+
+    private void startBroker() throws Exception {
+        BrokerService broker = new BrokerService();
+        broker.setUseJmx(false);
+        broker.setDataDirectory("target/data");
+        broker.addConnector("vm://localhost");
+        broker.addConnector("tcp://localhost:61616");
+        broker.start();
+    }
+
+    /**
+     * Positive test.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testClusterEntityWithValidInterfaces() throws Exception {
+
+        String filePath = context.overlayParametersOverTemplate(TestContext.CLUSTER_TEMPLATE, overlay);
+        InputStream stream = new FileInputStream(filePath);
+        Cluster cluster = parser.parse(stream);
+        Assert.assertNotNull(cluster);
+        cluster.setColo("default");  // validations will be ignored if not default & tests fail
+
+        StringWriter stringWriter = new StringWriter();
+        Marshaller marshaller = EntityType.CLUSTER.getMarshaller();
+        marshaller.marshal(cluster, stringWriter);
+        System.out.println(stringWriter.toString());
+        parser.parseAndValidate(stringWriter.toString());
+    }
+
+
+    @DataProvider(name = "interfaceToInvalidURLs")
+    public Object[][] createInterfaceToInvalidURLData() {
+        return new Object[][] {
+            // todo FileSystem validates invalid hftp url, does NOT fail
+            // {Interfacetype.READONLY, "hftp://localhost:41119"},
+            {Interfacetype.READONLY, ""},
+            {Interfacetype.READONLY, "localhost:41119"},
+            {Interfacetype.WRITE, "write-interface:9999"},
+            {Interfacetype.WRITE, "hdfs://write-interface:9999"},
+            {Interfacetype.EXECUTE, "execute-interface:9999"},
+            {Interfacetype.WORKFLOW, "workflow-interface:9999/oozie/"},
+            {Interfacetype.WORKFLOW, "http://workflow-interface:9999/oozie/"},
+            {Interfacetype.MESSAGING, "messaging-interface:9999"},
+            {Interfacetype.MESSAGING, "tcp://messaging-interface:9999"},
+        };
+    }
+
+    @Test (dataProvider = "interfaceToInvalidURLs",
+           expectedExceptions = {ValidationException.class, IllegalArgumentException.class})
+    public void testClusterEntityWithInvalidInterfaces(Interfacetype interfacetype, String endpoint)
+        throws Exception {
+        String filePath = context.overlayParametersOverTemplate(TestContext.CLUSTER_TEMPLATE, overlay);
+        InputStream stream = new FileInputStream(filePath);
+        Cluster cluster = parser.parse(stream);
+        Assert.assertNotNull(cluster);
+        cluster.setColo("default");  // validations will be ignored if not default & tests fail
+
+        Interface anInterface = ClusterHelper.getInterface(cluster, interfacetype);
+        anInterface.setEndpoint(endpoint);
+
+        StringWriter stringWriter = new StringWriter();
+        Marshaller marshaller = EntityType.CLUSTER.getMarshaller();
+        marshaller.marshal(cluster, stringWriter);
+        System.out.println(stringWriter.toString());
+        parser.parseAndValidate(stringWriter.toString());
+        Assert.fail("Validation exception must have been thrown for an invalid interface: "
+                + interfacetype + ", URL: " + endpoint);
+    }
+}