You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/10/10 06:33:13 UTC
incubator-eagle git commit: [EAGLE-593] Eagle Topology assembly
multi-version Hadoop support & bug fix
Repository: incubator-eagle
Updated Branches:
refs/heads/master 9e8737705 -> 17316124a
[EAGLE-593] Eagle Topology assembly multi-version Hadoop support & bug fix
1. User may want to support different version of hadoop in eagle jpm when
uses topology asembly to be a final package. For this requirement, we
add profiles to eagle-topology-assembly pom.xml. In this pom, we could
add different versions of Hadoop.
Fix StormSubmitter does not support submit another jar In storm
StormSubmitter
2. private static String submittedJar = null; private static void
submitJar(Map conf, ProgressListener listener) { if(submittedJar==null)
{ LOG.info("Jar not uploaded to master yet. Submitting jar..."); String
localJar = System.getProperty("storm.jar"); submittedJar =
submitJar(conf, localJar, listener); } else { LOG.info("Jar already
uploaded to master. Not submitting jar."); } }
3. It can not submit another jar once submittedJar is set. So I rewrote
this class.
Fix MR Running Job that does not set finished jobs' state
Closes #481
Closes #155
Closes #135
Closes #61
Eagle Topology assembly multi-version Hadoop support & bug fix
Eagle Topology assembly multi-version Hadoop support & bug fix
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/17316124
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/17316124
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/17316124
Branch: refs/heads/master
Commit: 17316124af9d82e07c0589550854c274d0cb1059
Parents: 9e87377
Author: wujinhu <wu...@126.com>
Authored: Mon Oct 10 10:27:59 2016 +0800
Committer: Hao Chen <ha...@apache.org>
Committed: Mon Oct 10 14:27:28 2016 +0800
----------------------------------------------------------------------
eagle-core/eagle-app/eagle-app-base/pom.xml | 5 +
.../environment/impl/StormExecutionRuntime.java | 1 -
.../app/environment/impl/StormSubmitter.java | 320 +++++++++++++++++++
eagle-core/eagle-common/pom.xml | 10 +
eagle-jpm/eagle-jpm-mr-history/pom.xml | 35 --
.../running/storm/MRRunningJobFetchSpout.java | 4 +-
eagle-jpm/eagle-jpm-spark-history/pom.xml | 35 --
eagle-jpm/eagle-jpm-spark-running/pom.xml | 41 ---
eagle-jpm/eagle-jpm-util/pom.xml | 5 -
.../jpm/util/jobrecover/RunningJobManager.java | 8 +-
eagle-jpm/pom.xml | 10 +
eagle-server/pom.xml | 2 +-
eagle-topology-assembly/pom.xml | 99 ++++++
pom.xml | 2 +
14 files changed, 454 insertions(+), 123 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/17316124/eagle-core/eagle-app/eagle-app-base/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/pom.xml b/eagle-core/eagle-app/eagle-app-base/pom.xml
index 9b0b29e..b3c9202 100644
--- a/eagle-core/eagle-app/eagle-app-base/pom.xml
+++ b/eagle-core/eagle-app/eagle-app-base/pom.xml
@@ -62,6 +62,11 @@
<version>1.1.1</version>
</dependency>
<dependency>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>libthrift</artifactId>
+ <version>${thrift.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.eagle</groupId>
<artifactId>alert-metadata</artifactId>
<version>${project.version}</version>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/17316124/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
index 7817c73..5f74d01 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
@@ -24,7 +24,6 @@ import org.apache.eagle.app.utils.DynamicJarPathFinder;
import org.apache.eagle.metadata.model.ApplicationEntity;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
-import backtype.storm.StormSubmitter;
import backtype.storm.generated.*;
import backtype.storm.utils.NimbusClient;
import com.google.common.base.Preconditions;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/17316124/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormSubmitter.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormSubmitter.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormSubmitter.java
new file mode 100644
index 0000000..76d6e1b
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormSubmitter.java
@@ -0,0 +1,320 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.environment.impl;
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+import org.json.simple.JSONValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.generated.AlreadyAliveException;
+import backtype.storm.generated.ClusterSummary;
+import backtype.storm.generated.InvalidTopologyException;
+import backtype.storm.generated.Nimbus;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.generated.SubmitOptions;
+import backtype.storm.generated.TopologySummary;
+import backtype.storm.utils.BufferFileInputStream;
+import backtype.storm.utils.NimbusClient;
+import backtype.storm.utils.Utils;
+import backtype.storm.Config;
+
+/**
+ * copy from storm StormSubmitter
+ * just rewrite StormSubmitter that does not support summit other jars once submittedJar is set.
+ * Our implementation will not add this restrict.
+ * Use this class to submit topologies to run on the Storm cluster. You should run your program
+ * with the "storm jar" command from the command-line, and then use this class to
+ * submit your topologies.
+ */
+
+public class StormSubmitter {
+ public static Logger LOG = LoggerFactory.getLogger(StormSubmitter.class);
+
+ private static final int THRIFT_CHUNK_SIZE_BYTES = 307200;
+
+ private static Nimbus.Iface localNimbus = null;
+
+ public static void setLocalNimbus(Nimbus.Iface localNimbusHandler) {
+ StormSubmitter.localNimbus = localNimbusHandler;
+ }
+
+ /**
+ * Submits a topology to run on the cluster. A topology runs forever or until
+ * explicitly killed.
+ *
+ *
+ * @param name the name of the storm.
+ * @param stormConf the topology-specific configuration. See {@link Config}.
+ * @param topology the processing to execute.
+ * @throws AlreadyAliveException if a topology with this name is already running
+ * @throws InvalidTopologyException if an invalid topology was submitted
+ */
+ public static void submitTopology(String name, Map stormConf, StormTopology topology) throws AlreadyAliveException, InvalidTopologyException {
+ submitTopology(name, stormConf, topology, null, null);
+ }
+
+ /**
+ * Submits a topology to run on the cluster. A topology runs forever or until
+ * explicitly killed.
+ *
+ * @param name the name of the storm.
+ * @param stormConf the topology-specific configuration. See {@link Config}.
+ * @param topology the processing to execute.
+ * @param opts to manipulate the starting of the topology.
+ * @throws AlreadyAliveException if a topology with this name is already running
+ * @throws InvalidTopologyException if an invalid topology was submitted
+ */
+ public static void submitTopology(String name, Map stormConf, StormTopology topology, SubmitOptions opts)
+ throws AlreadyAliveException, InvalidTopologyException {
+ submitTopology(name, stormConf, topology, opts, null);
+ }
+
+ /**
+ * Submits a topology to run on the cluster. A topology runs forever or until
+ * explicitly killed.
+ *
+ *
+ * @param name the name of the storm.
+ * @param stormConf the topology-specific configuration. See {@link Config}.
+ * @param topology the processing to execute.
+ * @param opts to manipulate the starting of the topology
+ * @param progressListener to track the progress of the jar upload process
+ * @throws AlreadyAliveException if a topology with this name is already running
+ * @throws InvalidTopologyException if an invalid topology was submitted
+ */
+ public static void submitTopology(String name, Map stormConf, StormTopology topology, SubmitOptions opts,
+ ProgressListener progressListener) throws AlreadyAliveException, InvalidTopologyException {
+ if (!Utils.isValidConf(stormConf)) {
+ throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable");
+ }
+ stormConf = new HashMap(stormConf);
+ stormConf.putAll(Utils.readCommandLineOpts());
+ Map conf = Utils.readStormConfig();
+ conf.putAll(stormConf);
+ try {
+ String serConf = JSONValue.toJSONString(stormConf);
+ if (localNimbus != null) {
+ LOG.info("Submitting topology " + name + " in local mode");
+ localNimbus.submitTopology(name, null, serConf, topology);
+ } else {
+ NimbusClient client = NimbusClient.getConfiguredClient(conf);
+ if (topologyNameExists(conf, name)) {
+ throw new RuntimeException("Topology with name `" + name + "` already exists on cluster");
+ }
+ submitJar(conf, progressListener);
+ try {
+ LOG.info("Submitting topology " + name + " in distributed mode with conf " + serConf);
+ if (opts != null) {
+ client.getClient().submitTopologyWithOpts(name, submittedJar, serConf, topology, opts);
+ } else {
+ // this is for backwards compatibility
+ client.getClient().submitTopology(name, submittedJar, serConf, topology);
+ }
+ } catch (InvalidTopologyException e) {
+ LOG.warn("Topology submission exception: " + e.get_msg());
+ throw e;
+ } catch (AlreadyAliveException e) {
+ LOG.warn("Topology already alive exception", e);
+ throw e;
+ } finally {
+ client.close();
+ }
+ }
+ LOG.info("Finished submitting topology: " + name);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Submits a topology to run on the cluster with a progress bar. A topology runs forever or until
+ * explicitly killed.
+ *
+ *
+ * @param name the name of the storm.
+ * @param stormConf the topology-specific configuration. See {@link Config}.
+ * @param topology the processing to execute.
+ * @throws AlreadyAliveException if a topology with this name is already running
+ * @throws InvalidTopologyException if an invalid topology was submitted
+ */
+
+ public static void submitTopologyWithProgressBar(String name, Map stormConf, StormTopology topology) throws AlreadyAliveException, InvalidTopologyException {
+ submitTopologyWithProgressBar(name, stormConf, topology, null);
+ }
+
+ /**
+ * Submits a topology to run on the cluster with a progress bar. A topology runs forever or until
+ * explicitly killed.
+ *
+ *
+ * @param name the name of the storm.
+ * @param stormConf the topology-specific configuration. See {@link Config}.
+ * @param topology the processing to execute.
+ * @param opts to manipulate the starting of the topology
+ * @throws AlreadyAliveException if a topology with this name is already running
+ * @throws InvalidTopologyException if an invalid topology was submitted
+ */
+
+ public static void submitTopologyWithProgressBar(String name, Map stormConf, StormTopology topology, SubmitOptions opts) throws AlreadyAliveException, InvalidTopologyException {
+ // show a progress bar so we know we're not stuck (especially on slow connections)
+ submitTopology(name, stormConf, topology, opts, new StormSubmitter.ProgressListener() {
+ @Override
+ public void onStart(String srcFile, String targetFile, long totalBytes) {
+ System.out.printf("Start uploading file '%s' to '%s' (%d bytes)\n", srcFile, targetFile, totalBytes);
+ }
+
+ @Override
+ public void onProgress(String srcFile, String targetFile, long bytesUploaded, long totalBytes) {
+ int length = 50;
+ int p = (int)((length * bytesUploaded) / totalBytes);
+ String progress = StringUtils.repeat("=", p);
+ String todo = StringUtils.repeat(" ", length - p);
+
+ System.out.printf("\r[%s%s] %d / %d", progress, todo, bytesUploaded, totalBytes);
+ }
+
+ @Override
+ public void onCompleted(String srcFile, String targetFile, long totalBytes) {
+ System.out.printf("\nFile '%s' uploaded to '%s' (%d bytes)\n", srcFile, targetFile, totalBytes);
+ }
+ });
+ }
+
+ private static boolean topologyNameExists(Map conf, String name) {
+ NimbusClient client = NimbusClient.getConfiguredClient(conf);
+ try {
+ ClusterSummary summary = client.getClient().getClusterInfo();
+ for (TopologySummary s : summary.get_topologies()) {
+ if (s.get_name().equals(name)) {
+ return true;
+ }
+ }
+ return false;
+
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+ client.close();
+ }
+ }
+
+ private static String submittedJar = null;
+
+ private static void submitJar(Map conf, ProgressListener listener) {
+ LOG.info("before uploaded, submittedJar = {}", submittedJar);
+ String localJar = System.getProperty("storm.jar");
+ submittedJar = submitJar(conf, localJar, listener);
+ LOG.info("after uploaded, submittedJar = {}", submittedJar);
+ }
+
+ /**
+ * Submit jar file
+ * @param conf the topology-specific configuration. See {@link Config}.
+ * @param localJar file path of the jar file to submit
+ * @return the remote location of the submitted jar
+ */
+ public static String submitJar(Map conf, String localJar) {
+ return submitJar(conf, localJar, null);
+ }
+
+ /**
+ * Submit jar file
+ * @param conf the topology-specific configuration. See {@link Config}.
+ * @param localJar file path of the jar file to submit
+ * @param listener progress listener to track the jar file upload
+ * @return the remote location of the submitted jar
+ */
+ public static String submitJar(Map conf, String localJar, ProgressListener listener) {
+ if (localJar == null) {
+ throw new RuntimeException("Must submit topologies using the 'storm' client script so that StormSubmitter knows which jar to upload.");
+ }
+
+ NimbusClient client = NimbusClient.getConfiguredClient(conf);
+ try {
+ String uploadLocation = client.getClient().beginFileUpload();
+ LOG.info("Uploading topology jar " + localJar + " to assigned location: " + uploadLocation);
+ BufferFileInputStream is = new BufferFileInputStream(localJar, THRIFT_CHUNK_SIZE_BYTES);
+
+ long totalSize = new File(localJar).length();
+ if (listener != null) {
+ listener.onStart(localJar, uploadLocation, totalSize);
+ }
+
+ long bytesUploaded = 0;
+ while (true) {
+ byte[] toSubmit = is.read();
+ bytesUploaded += toSubmit.length;
+ if (listener != null) {
+ listener.onProgress(localJar, uploadLocation, bytesUploaded, totalSize);
+ }
+
+ if (toSubmit.length == 0) {
+ break;
+ }
+ client.getClient().uploadChunk(uploadLocation, ByteBuffer.wrap(toSubmit));
+ }
+ client.getClient().finishFileUpload(uploadLocation);
+
+ if (listener != null) {
+ listener.onCompleted(localJar, uploadLocation, totalSize);
+ }
+
+ LOG.info("Successfully uploaded topology jar to assigned location: " + uploadLocation);
+ return uploadLocation;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+ client.close();
+ }
+ }
+
+ /**
+ * Interface use to track progress of file upload.
+ */
+ public interface ProgressListener {
+ /**
+ * called before file is uploaded.
+ * @param srcFile - jar file to be uploaded
+ * @param targetFile - destination file
+ * @param totalBytes - total number of bytes of the file
+ */
+ public void onStart(String srcFile, String targetFile, long totalBytes);
+
+ /**
+ * called whenever a chunk of bytes is uploaded.
+ * @param srcFile - jar file to be uploaded
+ * @param targetFile - destination file
+ * @param bytesUploaded - number of bytes transferred so far
+ * @param totalBytes - total number of bytes of the file
+ */
+ public void onProgress(String srcFile, String targetFile, long bytesUploaded, long totalBytes);
+
+ /**
+ * called when the file is uploaded.
+ * @param srcFile - jar file to be uploaded
+ * @param targetFile - destination file
+ * @param totalBytes - total number of bytes of the file
+ */
+ public void onCompleted(String srcFile, String targetFile, long totalBytes);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/17316124/eagle-core/eagle-common/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-common/pom.xml b/eagle-core/eagle-common/pom.xml
index 0fe052c..639d4d5 100644
--- a/eagle-core/eagle-common/pom.xml
+++ b/eagle-core/eagle-common/pom.xml
@@ -87,6 +87,16 @@
<artifactId>junit</artifactId>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>com.google.inject.extensions</groupId>
+ <artifactId>guice-servlet</artifactId>
+ <version>${guice.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.inject</groupId>
+ <artifactId>guice</artifactId>
+ <version>${guice.version}</version>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/17316124/eagle-jpm/eagle-jpm-mr-history/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/pom.xml b/eagle-jpm/eagle-jpm-mr-history/pom.xml
index 9a0177c..6b04fee 100644
--- a/eagle-jpm/eagle-jpm-mr-history/pom.xml
+++ b/eagle-jpm/eagle-jpm-mr-history/pom.xml
@@ -74,41 +74,6 @@
</exclusion>
</exclusions>
</dependency>
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-recipes</artifactId>
- <version>${curator.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <version>${hadoop.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-annotations</artifactId>
- <version>${hadoop.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-hdfs</artifactId>
- <version>${hadoop.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-auth</artifactId>
- <version>${hadoop.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-mapreduce-client-app</artifactId>
- <version>${hadoop.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-mapreduce-client-core</artifactId>
- <version>${hadoop.version}</version>
- </dependency>
</dependencies>
<build>
<resources>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/17316124/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobFetchSpout.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobFetchSpout.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobFetchSpout.java
index 268912c..27d1575 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobFetchSpout.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobFetchSpout.java
@@ -91,6 +91,9 @@ public class MRRunningJobFetchSpout extends BaseRichSpout {
apps = resourceFetcher.getResource(Constants.ResourceType.RUNNING_MR_JOB);
LOG.info("get {} apps from resource manager", apps.size());
Set<String> running = new HashSet<>();
+ for (AppInfo appInfo : apps) {
+ running.add(appInfo.getId());
+ }
Iterator<String> appIdIterator = this.runningYarnApps.iterator();
while (appIdIterator.hasNext()) {
String appId = appIdIterator.next();
@@ -99,7 +102,6 @@ public class MRRunningJobFetchSpout extends BaseRichSpout {
if (appId.equals(appInfo.getId())) {
hasFinished = false;
}
- running.add(appInfo.getId());
}
if (hasFinished) {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/17316124/eagle-jpm/eagle-jpm-spark-history/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/pom.xml b/eagle-jpm/eagle-jpm-spark-history/pom.xml
index 1c9c8b4..8f9828d 100644
--- a/eagle-jpm/eagle-jpm-spark-history/pom.xml
+++ b/eagle-jpm/eagle-jpm-spark-history/pom.xml
@@ -66,41 +66,6 @@
<version>2.12</version>
</dependency>
<dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-recipes</artifactId>
- <version>${curator.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <version>${hadoop.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-annotations</artifactId>
- <version>${hadoop.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-hdfs</artifactId>
- <version>${hadoop.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-auth</artifactId>
- <version>${hadoop.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-mapreduce-client-app</artifactId>
- <version>${hadoop.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-mapreduce-client-core</artifactId>
- <version>${hadoop.version}</version>
- </dependency>
- <dependency>
<groupId>org.apache.eagle</groupId>
<artifactId>eagle-app-base</artifactId>
<version>${project.version}</version>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/17316124/eagle-jpm/eagle-jpm-spark-running/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/pom.xml b/eagle-jpm/eagle-jpm-spark-running/pom.xml
index 982e530..56114f7 100644
--- a/eagle-jpm/eagle-jpm-spark-running/pom.xml
+++ b/eagle-jpm/eagle-jpm-spark-running/pom.xml
@@ -50,11 +50,6 @@
</exclusions>
</dependency>
<dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-client</artifactId>
- <version>${curator.version}</version>
- </dependency>
- <dependency>
<groupId>org.jsoup</groupId>
<artifactId>jsoup</artifactId>
</dependency>
@@ -83,42 +78,6 @@
<version>${project.version}</version>
</dependency>
<dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-hdfs-nfs</artifactId>
- <version>${hadoop.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <version>${hadoop.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-annotations</artifactId>
- <version>${hadoop.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-hdfs</artifactId>
- <version>${hadoop.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-auth</artifactId>
- <version>${hadoop.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-mapreduce-client-app</artifactId>
- <version>${hadoop.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-mapreduce-client-core</artifactId>
- <version>${hadoop.version}</version>
- </dependency>
- <dependency>
<groupId>org.apache.eagle</groupId>
<artifactId>eagle-app-base</artifactId>
<version>${project.version}</version>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/17316124/eagle-jpm/eagle-jpm-util/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/pom.xml b/eagle-jpm/eagle-jpm-util/pom.xml
index 0c6be01..e424e49 100644
--- a/eagle-jpm/eagle-jpm-util/pom.xml
+++ b/eagle-jpm/eagle-jpm-util/pom.xml
@@ -68,10 +68,5 @@
<artifactId>commons-codec</artifactId>
<version>1.9</version>
</dependency>
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-recipes</artifactId>
- <version>${curator.version}</version>
- </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/17316124/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobrecover/RunningJobManager.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobrecover/RunningJobManager.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobrecover/RunningJobManager.java
index 2d1af2c..253c61a 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobrecover/RunningJobManager.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobrecover/RunningJobManager.java
@@ -70,10 +70,10 @@ public class RunningJobManager implements Serializable {
Map<String, Pair<Map<String, String>, AppInfo>> result = new HashMap<>();
String path = this.zkRoot + "/" + yarnAppId;
List<String> jobIds = curator.getChildren().forPath(path);
- /*if (jobIds.size() == 0) {
+ if (jobIds.size() == 0) {
LOG.info("delete empty path {}", path);
delete(yarnAppId);
- }*/
+ }
for (String jobId : jobIds) {
String jobPath = path + "/" + jobId;
@@ -104,8 +104,8 @@ public class RunningJobManager implements Serializable {
appInfo.setStartedTime(Long.parseLong(appInfoMap.get("startedTime")));
appInfo.setFinishedTime(Long.parseLong(appInfoMap.get("finishedTime")));
appInfo.setElapsedTime(Long.parseLong(appInfoMap.get("elapsedTime")));
- appInfo.setAmContainerLogs(appInfoMap.get("amContainerLogs"));
- appInfo.setAmHostHttpAddress(appInfoMap.get("amHostHttpAddress"));
+ appInfo.setAmContainerLogs(appInfoMap.get("amContainerLogs") == null ? "" : appInfoMap.get("amContainerLogs"));
+ appInfo.setAmHostHttpAddress(appInfoMap.get("amHostHttpAddress") == null ? "" : appInfoMap.get("amHostHttpAddress"));
appInfo.setAllocatedMB(Long.parseLong(appInfoMap.get("allocatedMB")));
appInfo.setAllocatedVCores(Integer.parseInt(appInfoMap.get("allocatedVCores")));
appInfo.setRunningContainers(Integer.parseInt(appInfoMap.get("runningContainers")));
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/17316124/eagle-jpm/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/pom.xml b/eagle-jpm/pom.xml
index fce9b6c..258de2d 100644
--- a/eagle-jpm/pom.xml
+++ b/eagle-jpm/pom.xml
@@ -47,6 +47,16 @@
<dependencies>
<dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-recipes</artifactId>
+ <version>${curator.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-client</artifactId>
+ <version>${curator.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.eagle</groupId>
<artifactId>eagle-entity-base</artifactId>
<version>${project.version}</version>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/17316124/eagle-server/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-server/pom.xml b/eagle-server/pom.xml
index 3f4c70b..99de507 100644
--- a/eagle-server/pom.xml
+++ b/eagle-server/pom.xml
@@ -159,7 +159,7 @@
<profile>
<id>dev</id>
<activation>
- <activeByDefault>false</activeByDefault>
+ <activeByDefault>true</activeByDefault>
</activation>
<!-- =========================================================================================================
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/17316124/eagle-topology-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-topology-assembly/pom.xml b/eagle-topology-assembly/pom.xml
index 2cbba46..6a87a98 100644
--- a/eagle-topology-assembly/pom.xml
+++ b/eagle-topology-assembly/pom.xml
@@ -16,6 +16,13 @@
~ limitations under the License.
-->
+<!--
+ ~ eagle-topology-assembly supports multi version of Hadoop
+ ~ In this pom, we add two versions
+ ~ When you compile, you can you -PXXX to use the Hadoop version that you need to support
+ ~ like mvn clean install -Phadoop-2.7
+ -->
+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
@@ -33,6 +40,12 @@
<groupId>org.apache.eagle</groupId>
<artifactId>eagle-server</artifactId>
<version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-server</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.eagle</groupId>
@@ -51,6 +64,92 @@
</dependency>
</dependencies>
+ <profiles>
+ <profile>
+ <id>hadoop-2.7</id>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs-nfs</artifactId>
+ <version>${hadoop.2.7.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hadoop.2.7.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-annotations</artifactId>
+ <version>${hadoop.2.7.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <version>${hadoop.2.7.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-auth</artifactId>
+ <version>${hadoop.2.7.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-app</artifactId>
+ <version>${hadoop.2.7.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <version>${hadoop.2.7.version}</version>
+ </dependency>
+ </dependencies>
+ </profile>
+ <profile>
+ <id>hadoop-2.6</id>
+ <activation>
+ <activeByDefault>true</activeByDefault>
+ </activation>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-annotations</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-auth</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-app</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ </dependencies>
+ </profile>
+ </profiles>
+ <repositories>
+ <repository>
+ <id>hadoop.releases</id>
+ <url>http://repo.hortonworks.com/content/groups/public</url>
+ </repository>
+ </repositories>
<build>
<resources>
<resource>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/17316124/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 7246943..d12fb7d 100755
--- a/pom.xml
+++ b/pom.xml
@@ -168,7 +168,9 @@
<checkstyle.version>7.1</checkstyle.version>
<!-- Environment Versions-->
+ <thrift.version>0.9.2</thrift.version>
<hadoop.version>2.6.0.2.2.5.1-3</hadoop.version>
+ <hadoop.2.7.version>2.7.1.2.4.2.0-258</hadoop.2.7.version>
<hbase.version>0.98.4.2.2.5.1-3-hadoop2</hbase.version>
<hive.version>1.2.1</hive.version>