You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/10/10 06:33:13 UTC

incubator-eagle git commit: [EAGLE-593] Eagle Topology assembly multi-version Hadoop support & bug fix

Repository: incubator-eagle
Updated Branches:
  refs/heads/master 9e8737705 -> 17316124a


[EAGLE-593] Eagle Topology assembly multi-version Hadoop support & bug fix

1. User may want to support different version of hadoop in eagle jpm when
uses topology asembly to be a final package. For this requirement, we
add profiles to eagle-topology-assembly pom.xml. In this pom, we could
add different versions of Hadoop.
Fix StormSubmitter does not support submit another jar In storm
StormSubmitter

2. private static String submittedJar = null; private static void
submitJar(Map conf, ProgressListener listener) { if(submittedJar==null)
{ LOG.info("Jar not uploaded to master yet. Submitting jar..."); String
localJar = System.getProperty("storm.jar"); submittedJar =
submitJar(conf, localJar, listener); } else { LOG.info("Jar already
uploaded to master. Not submitting jar."); } }

3. It can not submit another jar once submittedJar is set. So I rewrote
this class.
Fix MR Running Job that does not set finished jobs' state

Closes #481

Closes #155
Closes #135
Closes #61

Eagle Topology assembly multi-version Hadoop support & bug fix

Eagle Topology assembly multi-version Hadoop support & bug fix


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/17316124
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/17316124
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/17316124

Branch: refs/heads/master
Commit: 17316124af9d82e07c0589550854c274d0cb1059
Parents: 9e87377
Author: wujinhu <wu...@126.com>
Authored: Mon Oct 10 10:27:59 2016 +0800
Committer: Hao Chen <ha...@apache.org>
Committed: Mon Oct 10 14:27:28 2016 +0800

----------------------------------------------------------------------
 eagle-core/eagle-app/eagle-app-base/pom.xml     |   5 +
 .../environment/impl/StormExecutionRuntime.java |   1 -
 .../app/environment/impl/StormSubmitter.java    | 320 +++++++++++++++++++
 eagle-core/eagle-common/pom.xml                 |  10 +
 eagle-jpm/eagle-jpm-mr-history/pom.xml          |  35 --
 .../running/storm/MRRunningJobFetchSpout.java   |   4 +-
 eagle-jpm/eagle-jpm-spark-history/pom.xml       |  35 --
 eagle-jpm/eagle-jpm-spark-running/pom.xml       |  41 ---
 eagle-jpm/eagle-jpm-util/pom.xml                |   5 -
 .../jpm/util/jobrecover/RunningJobManager.java  |   8 +-
 eagle-jpm/pom.xml                               |  10 +
 eagle-server/pom.xml                            |   2 +-
 eagle-topology-assembly/pom.xml                 |  99 ++++++
 pom.xml                                         |   2 +
 14 files changed, 454 insertions(+), 123 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/17316124/eagle-core/eagle-app/eagle-app-base/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/pom.xml b/eagle-core/eagle-app/eagle-app-base/pom.xml
index 9b0b29e..b3c9202 100644
--- a/eagle-core/eagle-app/eagle-app-base/pom.xml
+++ b/eagle-core/eagle-app/eagle-app-base/pom.xml
@@ -62,6 +62,11 @@
             <version>1.1.1</version>
         </dependency>
         <dependency>
+            <groupId>org.apache.thrift</groupId>
+            <artifactId>libthrift</artifactId>
+            <version>${thrift.version}</version>
+        </dependency>
+        <dependency>
             <groupId>org.apache.eagle</groupId>
             <artifactId>alert-metadata</artifactId>
             <version>${project.version}</version>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/17316124/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
index 7817c73..5f74d01 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
@@ -24,7 +24,6 @@ import org.apache.eagle.app.utils.DynamicJarPathFinder;
 import org.apache.eagle.metadata.model.ApplicationEntity;
 import backtype.storm.Config;
 import backtype.storm.LocalCluster;
-import backtype.storm.StormSubmitter;
 import backtype.storm.generated.*;
 import backtype.storm.utils.NimbusClient;
 import com.google.common.base.Preconditions;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/17316124/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormSubmitter.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormSubmitter.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormSubmitter.java
new file mode 100644
index 0000000..76d6e1b
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormSubmitter.java
@@ -0,0 +1,320 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.environment.impl;
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+import org.json.simple.JSONValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.generated.AlreadyAliveException;
+import backtype.storm.generated.ClusterSummary;
+import backtype.storm.generated.InvalidTopologyException;
+import backtype.storm.generated.Nimbus;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.generated.SubmitOptions;
+import backtype.storm.generated.TopologySummary;
+import backtype.storm.utils.BufferFileInputStream;
+import backtype.storm.utils.NimbusClient;
+import backtype.storm.utils.Utils;
+import backtype.storm.Config;
+
+/**
+ * copy from storm StormSubmitter
+ * just rewrite StormSubmitter that does not support summit other jars once submittedJar is set.
+ * Our implementation will not add this restrict.
+ * Use this class to submit topologies to run on the Storm cluster. You should run your program
+ * with the "storm jar" command from the command-line, and then use this class to
+ * submit your topologies.
+ */
+
+public class StormSubmitter {
+    public static Logger LOG = LoggerFactory.getLogger(StormSubmitter.class);
+
+    private static final int THRIFT_CHUNK_SIZE_BYTES = 307200;
+
+    private static Nimbus.Iface localNimbus = null;
+
+    public static void setLocalNimbus(Nimbus.Iface localNimbusHandler) {
+        StormSubmitter.localNimbus = localNimbusHandler;
+    }
+
+    /**
+     * Submits a topology to run on the cluster. A topology runs forever or until
+     * explicitly killed.
+     *
+     *
+     * @param name the name of the storm.
+     * @param stormConf the topology-specific configuration. See {@link Config}.
+     * @param topology the processing to execute.
+     * @throws AlreadyAliveException if a topology with this name is already running
+     * @throws InvalidTopologyException if an invalid topology was submitted
+     */
+    public static void submitTopology(String name, Map stormConf, StormTopology topology) throws AlreadyAliveException, InvalidTopologyException {
+        submitTopology(name, stormConf, topology, null, null);
+    }
+
+    /**
+     * Submits a topology to run on the cluster. A topology runs forever or until
+     * explicitly killed.
+     *
+     * @param name the name of the storm.
+     * @param stormConf the topology-specific configuration. See {@link Config}.
+     * @param topology the processing to execute.
+     * @param opts to manipulate the starting of the topology.
+     * @throws AlreadyAliveException if a topology with this name is already running
+     * @throws InvalidTopologyException if an invalid topology was submitted
+     */
+    public static void submitTopology(String name, Map stormConf, StormTopology topology, SubmitOptions opts)
+        throws AlreadyAliveException, InvalidTopologyException {
+        submitTopology(name, stormConf, topology, opts, null);
+    }
+
+    /**
+     * Submits a topology to run on the cluster. A topology runs forever or until
+     * explicitly killed.
+     *
+     *
+     * @param name the name of the storm.
+     * @param stormConf the topology-specific configuration. See {@link Config}.
+     * @param topology the processing to execute.
+     * @param opts to manipulate the starting of the topology
+     * @param progressListener to track the progress of the jar upload process
+     * @throws AlreadyAliveException if a topology with this name is already running
+     * @throws InvalidTopologyException if an invalid topology was submitted
+     */
+    public static void submitTopology(String name, Map stormConf, StormTopology topology, SubmitOptions opts,
+                                      ProgressListener progressListener) throws AlreadyAliveException, InvalidTopologyException {
+        if (!Utils.isValidConf(stormConf)) {
+            throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable");
+        }
+        stormConf = new HashMap(stormConf);
+        stormConf.putAll(Utils.readCommandLineOpts());
+        Map conf = Utils.readStormConfig();
+        conf.putAll(stormConf);
+        try {
+            String serConf = JSONValue.toJSONString(stormConf);
+            if (localNimbus != null) {
+                LOG.info("Submitting topology " + name + " in local mode");
+                localNimbus.submitTopology(name, null, serConf, topology);
+            } else {
+                NimbusClient client = NimbusClient.getConfiguredClient(conf);
+                if (topologyNameExists(conf, name)) {
+                    throw new RuntimeException("Topology with name `" + name + "` already exists on cluster");
+                }
+                submitJar(conf, progressListener);
+                try {
+                    LOG.info("Submitting topology " +  name + " in distributed mode with conf " + serConf);
+                    if (opts != null) {
+                        client.getClient().submitTopologyWithOpts(name, submittedJar, serConf, topology, opts);
+                    } else {
+                        // this is for backwards compatibility
+                        client.getClient().submitTopology(name, submittedJar, serConf, topology);
+                    }
+                } catch (InvalidTopologyException e) {
+                    LOG.warn("Topology submission exception: " + e.get_msg());
+                    throw e;
+                } catch (AlreadyAliveException e) {
+                    LOG.warn("Topology already alive exception", e);
+                    throw e;
+                } finally {
+                    client.close();
+                }
+            }
+            LOG.info("Finished submitting topology: " +  name);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * Submits a topology to run on the cluster with a progress bar. A topology runs forever or until
+     * explicitly killed.
+     *
+     *
+     * @param name the name of the storm.
+     * @param stormConf the topology-specific configuration. See {@link Config}.
+     * @param topology the processing to execute.
+     * @throws AlreadyAliveException if a topology with this name is already running
+     * @throws InvalidTopologyException if an invalid topology was submitted
+     */
+
+    public static void submitTopologyWithProgressBar(String name, Map stormConf, StormTopology topology) throws AlreadyAliveException, InvalidTopologyException {
+        submitTopologyWithProgressBar(name, stormConf, topology, null);
+    }
+
+    /**
+     * Submits a topology to run on the cluster with a progress bar. A topology runs forever or until
+     * explicitly killed.
+     *
+     *
+     * @param name the name of the storm.
+     * @param stormConf the topology-specific configuration. See {@link Config}.
+     * @param topology the processing to execute.
+     * @param opts to manipulate the starting of the topology
+     * @throws AlreadyAliveException if a topology with this name is already running
+     * @throws InvalidTopologyException if an invalid topology was submitted
+     */
+
+    public static void submitTopologyWithProgressBar(String name, Map stormConf, StormTopology topology, SubmitOptions opts) throws AlreadyAliveException, InvalidTopologyException {
+        // show a progress bar so we know we're not stuck (especially on slow connections)
+        submitTopology(name, stormConf, topology, opts, new StormSubmitter.ProgressListener() {
+            @Override
+            public void onStart(String srcFile, String targetFile, long totalBytes) {
+                System.out.printf("Start uploading file '%s' to '%s' (%d bytes)\n", srcFile, targetFile, totalBytes);
+            }
+
+            @Override
+            public void onProgress(String srcFile, String targetFile, long bytesUploaded, long totalBytes) {
+                int length = 50;
+                int p = (int)((length * bytesUploaded) / totalBytes);
+                String progress = StringUtils.repeat("=", p);
+                String todo = StringUtils.repeat(" ", length - p);
+
+                System.out.printf("\r[%s%s] %d / %d", progress, todo, bytesUploaded, totalBytes);
+            }
+
+            @Override
+            public void onCompleted(String srcFile, String targetFile, long totalBytes) {
+                System.out.printf("\nFile '%s' uploaded to '%s' (%d bytes)\n", srcFile, targetFile, totalBytes);
+            }
+        });
+    }
+
+    private static boolean topologyNameExists(Map conf, String name) {
+        NimbusClient client = NimbusClient.getConfiguredClient(conf);
+        try {
+            ClusterSummary summary = client.getClient().getClusterInfo();
+            for (TopologySummary s : summary.get_topologies()) {
+                if (s.get_name().equals(name)) {
+                    return true;
+                }
+            }
+            return false;
+
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        } finally {
+            client.close();
+        }
+    }
+
+    private static String submittedJar = null;
+
+    private static void submitJar(Map conf, ProgressListener listener) {
+        LOG.info("before uploaded, submittedJar = {}", submittedJar);
+        String localJar = System.getProperty("storm.jar");
+        submittedJar = submitJar(conf, localJar, listener);
+        LOG.info("after uploaded, submittedJar = {}", submittedJar);
+    }
+
+    /**
+     * Submit jar file
+     * @param conf the topology-specific configuration. See {@link Config}.
+     * @param localJar file path of the jar file to submit
+     * @return the remote location of the submitted jar
+     */
+    public static String submitJar(Map conf, String localJar) {
+        return submitJar(conf, localJar, null);
+    }
+
+    /**
+     * Submit jar file
+     * @param conf the topology-specific configuration. See {@link Config}.
+     * @param localJar file path of the jar file to submit
+     * @param listener progress listener to track the jar file upload
+     * @return the remote location of the submitted jar
+     */
+    public static String submitJar(Map conf, String localJar, ProgressListener listener) {
+        if (localJar == null) {
+            throw new RuntimeException("Must submit topologies using the 'storm' client script so that StormSubmitter knows which jar to upload.");
+        }
+
+        NimbusClient client = NimbusClient.getConfiguredClient(conf);
+        try {
+            String uploadLocation = client.getClient().beginFileUpload();
+            LOG.info("Uploading topology jar " + localJar + " to assigned location: " + uploadLocation);
+            BufferFileInputStream is = new BufferFileInputStream(localJar, THRIFT_CHUNK_SIZE_BYTES);
+
+            long totalSize = new File(localJar).length();
+            if (listener != null) {
+                listener.onStart(localJar, uploadLocation, totalSize);
+            }
+
+            long bytesUploaded = 0;
+            while (true) {
+                byte[] toSubmit = is.read();
+                bytesUploaded += toSubmit.length;
+                if (listener != null) {
+                    listener.onProgress(localJar, uploadLocation, bytesUploaded, totalSize);
+                }
+
+                if (toSubmit.length == 0) {
+                    break;
+                }
+                client.getClient().uploadChunk(uploadLocation, ByteBuffer.wrap(toSubmit));
+            }
+            client.getClient().finishFileUpload(uploadLocation);
+
+            if (listener != null) {
+                listener.onCompleted(localJar, uploadLocation, totalSize);
+            }
+
+            LOG.info("Successfully uploaded topology jar to assigned location: " + uploadLocation);
+            return uploadLocation;
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        } finally {
+            client.close();
+        }
+    }
+
+    /**
+     * Interface use to track progress of file upload.
+     */
+    public interface ProgressListener {
+        /**
+         * called before file is uploaded.
+         * @param srcFile - jar file to be uploaded
+         * @param targetFile - destination file
+         * @param totalBytes - total number of bytes of the file
+         */
+        public void onStart(String srcFile, String targetFile, long totalBytes);
+
+        /**
+         * called whenever a chunk of bytes is uploaded.
+         * @param srcFile - jar file to be uploaded
+         * @param targetFile - destination file
+         * @param bytesUploaded - number of bytes transferred so far
+         * @param totalBytes - total number of bytes of the file
+         */
+        public void onProgress(String srcFile, String targetFile, long bytesUploaded, long totalBytes);
+
+        /**
+         * called when the file is uploaded.
+         * @param srcFile - jar file to be uploaded
+         * @param targetFile - destination file
+         * @param totalBytes - total number of bytes of the file
+         */
+        public void onCompleted(String srcFile, String targetFile, long totalBytes);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/17316124/eagle-core/eagle-common/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-common/pom.xml b/eagle-core/eagle-common/pom.xml
index 0fe052c..639d4d5 100644
--- a/eagle-core/eagle-common/pom.xml
+++ b/eagle-core/eagle-common/pom.xml
@@ -87,6 +87,16 @@
             <artifactId>junit</artifactId>
             <scope>compile</scope>
         </dependency>
+        <dependency>
+            <groupId>com.google.inject.extensions</groupId>
+            <artifactId>guice-servlet</artifactId>
+            <version>${guice.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.google.inject</groupId>
+            <artifactId>guice</artifactId>
+            <version>${guice.version}</version>
+        </dependency>
     </dependencies>
 </project>
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/17316124/eagle-jpm/eagle-jpm-mr-history/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/pom.xml b/eagle-jpm/eagle-jpm-mr-history/pom.xml
index 9a0177c..6b04fee 100644
--- a/eagle-jpm/eagle-jpm-mr-history/pom.xml
+++ b/eagle-jpm/eagle-jpm-mr-history/pom.xml
@@ -74,41 +74,6 @@
                 </exclusion>
             </exclusions>
         </dependency>
-        <dependency>
-            <groupId>org.apache.curator</groupId>
-            <artifactId>curator-recipes</artifactId>
-            <version>${curator.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-common</artifactId>
-            <version>${hadoop.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-annotations</artifactId>
-            <version>${hadoop.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-hdfs</artifactId>
-            <version>${hadoop.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-auth</artifactId>
-            <version>${hadoop.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-mapreduce-client-app</artifactId>
-            <version>${hadoop.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-mapreduce-client-core</artifactId>
-            <version>${hadoop.version}</version>
-        </dependency>
     </dependencies>
     <build>
         <resources>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/17316124/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobFetchSpout.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobFetchSpout.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobFetchSpout.java
index 268912c..27d1575 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobFetchSpout.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobFetchSpout.java
@@ -91,6 +91,9 @@ public class MRRunningJobFetchSpout extends BaseRichSpout {
                 apps = resourceFetcher.getResource(Constants.ResourceType.RUNNING_MR_JOB);
                 LOG.info("get {} apps from resource manager", apps.size());
                 Set<String> running = new HashSet<>();
+                for (AppInfo appInfo : apps) {
+                    running.add(appInfo.getId());
+                }
                 Iterator<String> appIdIterator = this.runningYarnApps.iterator();
                 while (appIdIterator.hasNext()) {
                     String appId = appIdIterator.next();
@@ -99,7 +102,6 @@ public class MRRunningJobFetchSpout extends BaseRichSpout {
                         if (appId.equals(appInfo.getId())) {
                             hasFinished = false;
                         }
-                        running.add(appInfo.getId());
                     }
 
                     if (hasFinished) {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/17316124/eagle-jpm/eagle-jpm-spark-history/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/pom.xml b/eagle-jpm/eagle-jpm-spark-history/pom.xml
index 1c9c8b4..8f9828d 100644
--- a/eagle-jpm/eagle-jpm-spark-history/pom.xml
+++ b/eagle-jpm/eagle-jpm-spark-history/pom.xml
@@ -66,41 +66,6 @@
             <version>2.12</version>
         </dependency>
         <dependency>
-            <groupId>org.apache.curator</groupId>
-            <artifactId>curator-recipes</artifactId>
-            <version>${curator.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-common</artifactId>
-            <version>${hadoop.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-annotations</artifactId>
-            <version>${hadoop.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-hdfs</artifactId>
-            <version>${hadoop.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-auth</artifactId>
-            <version>${hadoop.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-mapreduce-client-app</artifactId>
-            <version>${hadoop.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-mapreduce-client-core</artifactId>
-            <version>${hadoop.version}</version>
-        </dependency>
-        <dependency>
             <groupId>org.apache.eagle</groupId>
             <artifactId>eagle-app-base</artifactId>
             <version>${project.version}</version>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/17316124/eagle-jpm/eagle-jpm-spark-running/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/pom.xml b/eagle-jpm/eagle-jpm-spark-running/pom.xml
index 982e530..56114f7 100644
--- a/eagle-jpm/eagle-jpm-spark-running/pom.xml
+++ b/eagle-jpm/eagle-jpm-spark-running/pom.xml
@@ -50,11 +50,6 @@
             </exclusions>
         </dependency>
         <dependency>
-            <groupId>org.apache.curator</groupId>
-            <artifactId>curator-client</artifactId>
-            <version>${curator.version}</version>
-        </dependency>
-        <dependency>
             <groupId>org.jsoup</groupId>
             <artifactId>jsoup</artifactId>
         </dependency>
@@ -83,42 +78,6 @@
             <version>${project.version}</version>
         </dependency>
         <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-hdfs-nfs</artifactId>
-            <version>${hadoop.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-common</artifactId>
-            <version>${hadoop.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-annotations</artifactId>
-            <version>${hadoop.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-hdfs</artifactId>
-            <version>${hadoop.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-auth</artifactId>
-            <version>${hadoop.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-mapreduce-client-app</artifactId>
-            <version>${hadoop.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-mapreduce-client-core</artifactId>
-            <version>${hadoop.version}</version>
-        </dependency>
-        <dependency>
             <groupId>org.apache.eagle</groupId>
             <artifactId>eagle-app-base</artifactId>
             <version>${project.version}</version>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/17316124/eagle-jpm/eagle-jpm-util/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/pom.xml b/eagle-jpm/eagle-jpm-util/pom.xml
index 0c6be01..e424e49 100644
--- a/eagle-jpm/eagle-jpm-util/pom.xml
+++ b/eagle-jpm/eagle-jpm-util/pom.xml
@@ -68,10 +68,5 @@
             <artifactId>commons-codec</artifactId>
             <version>1.9</version>
         </dependency>
-        <dependency>
-            <groupId>org.apache.curator</groupId>
-            <artifactId>curator-recipes</artifactId>
-            <version>${curator.version}</version>
-        </dependency>
     </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/17316124/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobrecover/RunningJobManager.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobrecover/RunningJobManager.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobrecover/RunningJobManager.java
index 2d1af2c..253c61a 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobrecover/RunningJobManager.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobrecover/RunningJobManager.java
@@ -70,10 +70,10 @@ public class RunningJobManager implements Serializable {
         Map<String, Pair<Map<String, String>, AppInfo>> result = new HashMap<>();
         String path = this.zkRoot + "/" + yarnAppId;
         List<String> jobIds = curator.getChildren().forPath(path);
-        /*if (jobIds.size() == 0) {
+        if (jobIds.size() == 0) {
             LOG.info("delete empty path {}", path);
             delete(yarnAppId);
-        }*/
+        }
 
         for (String jobId : jobIds) {
             String jobPath = path + "/" + jobId;
@@ -104,8 +104,8 @@ public class RunningJobManager implements Serializable {
             appInfo.setStartedTime(Long.parseLong(appInfoMap.get("startedTime")));
             appInfo.setFinishedTime(Long.parseLong(appInfoMap.get("finishedTime")));
             appInfo.setElapsedTime(Long.parseLong(appInfoMap.get("elapsedTime")));
-            appInfo.setAmContainerLogs(appInfoMap.get("amContainerLogs"));
-            appInfo.setAmHostHttpAddress(appInfoMap.get("amHostHttpAddress"));
+            appInfo.setAmContainerLogs(appInfoMap.get("amContainerLogs") == null ? "" : appInfoMap.get("amContainerLogs"));
+            appInfo.setAmHostHttpAddress(appInfoMap.get("amHostHttpAddress") == null ? "" : appInfoMap.get("amHostHttpAddress"));
             appInfo.setAllocatedMB(Long.parseLong(appInfoMap.get("allocatedMB")));
             appInfo.setAllocatedVCores(Integer.parseInt(appInfoMap.get("allocatedVCores")));
             appInfo.setRunningContainers(Integer.parseInt(appInfoMap.get("runningContainers")));

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/17316124/eagle-jpm/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/pom.xml b/eagle-jpm/pom.xml
index fce9b6c..258de2d 100644
--- a/eagle-jpm/pom.xml
+++ b/eagle-jpm/pom.xml
@@ -47,6 +47,16 @@
 
     <dependencies>
         <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-recipes</artifactId>
+            <version>${curator.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-client</artifactId>
+            <version>${curator.version}</version>
+        </dependency>
+        <dependency>
             <groupId>org.apache.eagle</groupId>
             <artifactId>eagle-entity-base</artifactId>
             <version>${project.version}</version>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/17316124/eagle-server/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-server/pom.xml b/eagle-server/pom.xml
index 3f4c70b..99de507 100644
--- a/eagle-server/pom.xml
+++ b/eagle-server/pom.xml
@@ -159,7 +159,7 @@
         <profile>
             <id>dev</id>
             <activation>
-                <activeByDefault>false</activeByDefault>
+                <activeByDefault>true</activeByDefault>
             </activation>
 
             <!-- =========================================================================================================

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/17316124/eagle-topology-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-topology-assembly/pom.xml b/eagle-topology-assembly/pom.xml
index 2cbba46..6a87a98 100644
--- a/eagle-topology-assembly/pom.xml
+++ b/eagle-topology-assembly/pom.xml
@@ -16,6 +16,13 @@
   ~ limitations under the License.
   -->
 
+<!--
+  ~ eagle-topology-assembly supports multi version of Hadoop
+  ~ In this pom, we add two versions
+  ~ When you compile, you can you -PXXX to use the Hadoop version that you need to support
+  ~ like mvn clean install -Phadoop-2.7
+  -->
+
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <modelVersion>4.0.0</modelVersion>
@@ -33,6 +40,12 @@
             <groupId>org.apache.eagle</groupId>
             <artifactId>eagle-server</artifactId>
             <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.hbase</groupId>
+                    <artifactId>hbase-server</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
         <dependency>
             <groupId>org.apache.eagle</groupId>
@@ -51,6 +64,92 @@
         </dependency>
     </dependencies>
 
+    <profiles>
+        <profile>
+            <id>hadoop-2.7</id>
+            <dependencies>
+                <dependency>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-hdfs-nfs</artifactId>
+                    <version>${hadoop.2.7.version}</version>
+                </dependency>
+                <dependency>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-common</artifactId>
+                    <version>${hadoop.2.7.version}</version>
+                </dependency>
+                <dependency>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-annotations</artifactId>
+                    <version>${hadoop.2.7.version}</version>
+                </dependency>
+                <dependency>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-hdfs</artifactId>
+                    <version>${hadoop.2.7.version}</version>
+                </dependency>
+                <dependency>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-auth</artifactId>
+                    <version>${hadoop.2.7.version}</version>
+                </dependency>
+                <dependency>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-mapreduce-client-app</artifactId>
+                    <version>${hadoop.2.7.version}</version>
+                </dependency>
+                <dependency>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-mapreduce-client-core</artifactId>
+                    <version>${hadoop.2.7.version}</version>
+                </dependency>
+            </dependencies>
+        </profile>
+        <profile>
+            <id>hadoop-2.6</id>
+            <activation>
+                <activeByDefault>true</activeByDefault>
+            </activation>
+            <dependencies>
+                <dependency>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-common</artifactId>
+                    <version>${hadoop.version}</version>
+                </dependency>
+                <dependency>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-annotations</artifactId>
+                    <version>${hadoop.version}</version>
+                </dependency>
+                <dependency>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-hdfs</artifactId>
+                    <version>${hadoop.version}</version>
+                </dependency>
+                <dependency>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-auth</artifactId>
+                    <version>${hadoop.version}</version>
+                </dependency>
+                <dependency>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-mapreduce-client-app</artifactId>
+                    <version>${hadoop.version}</version>
+                </dependency>
+                <dependency>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-mapreduce-client-core</artifactId>
+                    <version>${hadoop.version}</version>
+                </dependency>
+            </dependencies>
+        </profile>
+    </profiles>
+    <repositories>
+        <repository>
+            <id>hadoop.releases</id>
+            <url>http://repo.hortonworks.com/content/groups/public</url>
+        </repository>
+    </repositories>
     <build>
         <resources>
             <resource>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/17316124/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 7246943..d12fb7d 100755
--- a/pom.xml
+++ b/pom.xml
@@ -168,7 +168,9 @@
         <checkstyle.version>7.1</checkstyle.version>
 
         <!-- Environment Versions-->
+        <thrift.version>0.9.2</thrift.version>
         <hadoop.version>2.6.0.2.2.5.1-3</hadoop.version>
+        <hadoop.2.7.version>2.7.1.2.4.2.0-258</hadoop.2.7.version>
         <hbase.version>0.98.4.2.2.5.1-3-hadoop2</hbase.version>
         <hive.version>1.2.1</hive.version>