You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by yo...@apache.org on 2016/07/05 18:07:47 UTC
[8/8] incubator-eagle git commit: EAGLE-276 eagle support for mr &
spark history job monitoring mr & spark job history monitoring
EAGLE-276 eagle support for mr & spark history job monitoring
mr & spark job history monitoring
Author: @wujinhu <ji...@ebay.com>
Reviewer: @yonzhang <yo...@apache.org>
Closes: #217
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/fe509125
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/fe509125
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/fe509125
Branch: refs/heads/develop
Commit: fe50912574dfe2126da0154b8544d81022126acc
Parents: f857cbe
Author: yonzhang <yo...@gmail.com>
Authored: Tue Jul 5 11:10:45 2016 -0700
Committer: yonzhang <yo...@gmail.com>
Committed: Tue Jul 5 11:10:45 2016 -0700
----------------------------------------------------------------------
.../apache/eagle/common/SerializableUtils.java | 126 ++++
.../entity/meta/DefaultJavaObjctSerDeser.java | 41 ++
.../entity/meta/EntityDefinitionManager.java | 21 +-
eagle-jpm/eagle-jpa-spark-history/pom.xml | 66 ++
eagle-jpm/eagle-jpa-spark-running/pom.xml | 66 ++
eagle-jpm/eagle-jpm-entity/pom.xml | 52 ++
.../eagle/jpm/entity/JPMEntityRepository.java | 30 +
.../org/apache/eagle/jpm/entity/JobConfig.java | 38 +
.../org/apache/eagle/jpm/entity/SparkApp.java | 428 ++++++++++++
.../apache/eagle/jpm/entity/SparkExecutor.java | 233 +++++++
.../org/apache/eagle/jpm/entity/SparkJob.java | 178 +++++
.../org/apache/eagle/jpm/entity/SparkStage.java | 299 ++++++++
.../org/apache/eagle/jpm/entity/SparkTask.java | 290 ++++++++
eagle-jpm/eagle-jpm-mr-history/pom.xml | 138 ++++
.../assembly/eagle-jpm-mr-history-assembly.xml | 65 ++
.../eagle/jpm/mr/history/MRHistoryJobMain.java | 87 +++
.../jpm/mr/history/common/JHFConfigManager.java | 182 +++++
.../jpm/mr/history/common/JPAConstants.java | 95 +++
.../eagle/jpm/mr/history/common/JobConfig.java | 38 +
.../history/crawler/AbstractJobHistoryDAO.java | 194 +++++
.../crawler/DefaultJHFInputStreamCallback.java | 66 ++
.../mr/history/crawler/JHFCrawlerDriver.java | 27 +
.../history/crawler/JHFCrawlerDriverImpl.java | 277 ++++++++
.../history/crawler/JHFInputStreamCallback.java | 37 +
.../crawler/JobHistoryContentFilter.java | 36 +
.../crawler/JobHistoryContentFilterBuilder.java | 91 +++
.../crawler/JobHistoryContentFilterImpl.java | 94 +++
.../mr/history/crawler/JobHistoryDAOImpl.java | 203 ++++++
.../jpm/mr/history/crawler/JobHistoryLCM.java | 86 +++
.../JobHistorySpoutCollectorInterceptor.java | 36 +
.../history/entities/JPAEntityRepository.java | 40 ++
.../mr/history/entities/JobBaseAPIEntity.java | 24 +
.../mr/history/entities/JobConfigSerDeser.java | 63 ++
.../entities/JobConfigurationAPIEntity.java | 67 ++
.../history/entities/JobCountersSerDeser.java | 166 +++++
.../mr/history/entities/JobEventAPIEntity.java | 44 ++
.../history/entities/JobExecutionAPIEntity.java | 132 ++++
.../entities/JobProcessTimeStampEntity.java | 44 ++
.../entities/TaskAttemptCounterAPIEntity.java | 61 ++
.../entities/TaskAttemptExecutionAPIEntity.java | 101 +++
.../entities/TaskExecutionAPIEntity.java | 89 +++
.../entities/TaskFailureCountAPIEntity.java | 67 ++
.../jobcounter/CounterGroupDictionary.java | 238 +++++++
.../mr/history/jobcounter/CounterGroupKey.java | 32 +
.../jpm/mr/history/jobcounter/CounterKey.java | 30 +
.../history/jobcounter/JobCounterException.java | 63 ++
.../jpm/mr/history/jobcounter/JobCounters.java | 47 ++
.../jpm/mr/history/parser/EagleJobStatus.java | 28 +
.../jpm/mr/history/parser/EagleJobTagName.java | 48 ++
.../jpm/mr/history/parser/EagleTaskStatus.java | 25 +
.../HistoryJobEntityCreationListener.java | 39 ++
.../HistoryJobEntityLifecycleListener.java | 34 +
.../jpm/mr/history/parser/ImportException.java | 33 +
.../mr/history/parser/JHFEventReaderBase.java | 405 +++++++++++
.../eagle/jpm/mr/history/parser/JHFFormat.java | 24 +
.../mr/history/parser/JHFMRVer1EventReader.java | 150 ++++
.../jpm/mr/history/parser/JHFMRVer1Parser.java | 271 +++++++
.../parser/JHFMRVer1PerLineListener.java | 39 ++
.../mr/history/parser/JHFMRVer2EventReader.java | 380 ++++++++++
.../jpm/mr/history/parser/JHFMRVer2Parser.java | 87 +++
.../jpm/mr/history/parser/JHFParserBase.java | 35 +
.../jpm/mr/history/parser/JHFParserFactory.java | 71 ++
.../parser/JHFWriteNotCompletedException.java | 36 +
...JobConfigurationCreationServiceListener.java | 92 +++
.../JobEntityCreationEagleServiceListener.java | 127 ++++
.../parser/JobEntityCreationPublisher.java | 47 ++
.../parser/JobEntityLifecycleAggregator.java | 176 +++++
.../mr/history/parser/MRErrorClassifier.java | 112 +++
.../jpm/mr/history/parser/RecordTypes.java | 26 +
.../parser/TaskAttemptCounterListener.java | 152 ++++
.../mr/history/parser/TaskFailureListener.java | 137 ++++
.../history/storm/DefaultJobIdPartitioner.java | 28 +
.../history/storm/HistoryJobProgressBolt.java | 132 ++++
.../jpm/mr/history/storm/JobHistorySpout.java | 208 ++++++
.../eagle/jpm/mr/history/storm/JobIdFilter.java | 23 +
.../history/storm/JobIdFilterByPartition.java | 40 ++
.../jpm/mr/history/storm/JobIdPartitioner.java | 23 +
.../mr/history/zkres/JobHistoryZKStateLCM.java | 31 +
.../history/zkres/JobHistoryZKStateManager.java | 305 ++++++++
.../src/main/resources/JobCounter.conf | 185 +++++
.../services/org.apache.hadoop.fs.FileSystem | 20 +
.../src/main/resources/MRErrorCategory.config | 41 ++
.../src/main/resources/application.conf | 85 +++
.../src/main/resources/core-site.xml | 497 +++++++++++++
.../src/main/resources/hdfs-site.xml | 449 ++++++++++++
.../src/main/resources/log4j.properties | 34 +
eagle-jpm/eagle-jpm-spark-history/pom.xml | 122 ++++
.../eagle-jpm-spark-history-assembly.xml | 65 ++
.../history/config/SparkHistoryCrawlConfig.java | 122 ++++
.../jpm/spark/history/crawl/EventType.java | 24 +
.../history/crawl/JHFInputStreamReader.java | 25 +
.../jpm/spark/history/crawl/JHFParserBase.java | 29 +
.../history/crawl/JHFSparkEventReader.java | 699 +++++++++++++++++++
.../jpm/spark/history/crawl/JHFSparkParser.java | 63 ++
.../history/crawl/SparkApplicationInfo.java | 69 ++
.../SparkHistoryFileInputStreamReaderImpl.java | 53 ++
.../status/JobHistoryZKStateManager.java | 262 +++++++
.../spark/history/status/ZKStateConstant.java | 27 +
.../history/storm/FinishedSparkJobSpout.java | 152 ++++
.../history/storm/SparkHistoryTopology.java | 81 +++
.../spark/history/storm/SparkJobParseBolt.java | 178 +++++
.../eagle/jpm/spark/history/storm/TestHDFS.java | 47 ++
.../services/org.apache.hadoop.fs.FileSystem | 20 +
.../src/main/resources/application.conf | 77 ++
.../src/main/resources/log4j.properties | 35 +
eagle-jpm/eagle-jpm-spark-running/pom.xml | 66 ++
eagle-jpm/eagle-jpm-util/pom.xml | 65 ++
.../org/apache/eagle/jpm/util/Constants.java | 49 ++
.../org/apache/eagle/jpm/util/HDFSUtil.java | 44 ++
.../org/apache/eagle/jpm/util/JSONUtil.java | 66 ++
.../eagle/jpm/util/JobNameNormalization.java | 118 ++++
.../eagle/jpm/util/SparkEntityConstant.java | 29 +
.../apache/eagle/jpm/util/SparkJobTagName.java | 44 ++
.../util/resourceFetch/RMResourceFetcher.java | 98 +++
.../jpm/util/resourceFetch/ResourceFetcher.java | 27 +
.../SparkHistoryServerResourceFetcher.java | 81 +++
.../connection/InputStreamUtils.java | 69 ++
.../util/resourceFetch/connection/JobUtils.java | 43 ++
.../connection/URLConnectionUtils.java | 102 +++
.../util/resourceFetch/ha/HAURLSelector.java | 28 +
.../resourceFetch/ha/HAURLSelectorImpl.java | 101 +++
.../jpm/util/resourceFetch/model/AppInfo.java | 146 ++++
.../util/resourceFetch/model/Applications.java | 38 +
.../util/resourceFetch/model/AppsWrapper.java | 36 +
.../resourceFetch/model/SparkApplication.java | 57 ++
.../model/SparkApplicationAttempt.java | 73 ++
.../model/SparkApplicationWrapper.java | 38 +
.../url/JobListServiceURLBuilderImpl.java | 37 +
.../resourceFetch/url/ServiceURLBuilder.java | 21 +
.../SparkCompleteJobServiceURLBuilderImpl.java | 29 +
.../url/SparkJobServiceURLBuilderImpl.java | 29 +
.../src/main/resources/application.properties | 23 +
eagle-jpm/pom.xml | 54 ++
133 files changed, 13354 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-core/eagle-query/eagle-common/src/main/java/org/apache/eagle/common/SerializableUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-common/src/main/java/org/apache/eagle/common/SerializableUtils.java b/eagle-core/eagle-query/eagle-common/src/main/java/org/apache/eagle/common/SerializableUtils.java
new file mode 100644
index 0000000..c5823ea
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-common/src/main/java/org/apache/eagle/common/SerializableUtils.java
@@ -0,0 +1,126 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.common;
+
+import org.xerial.snappy.SnappyInputStream;
+import org.xerial.snappy.SnappyOutputStream;
+
+import java.io.*;
+
+/**
+ * Utilities for working with Serializables.
+ *
+ * Derived from "com.google.cloud.dataflow.sdk.util.SerializableUtils":
+ * https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/SerializableUtils.java
+ */
+public class SerializableUtils {
+ /**
+ * Serializes the argument into an array of bytes, and returns it.
+ *
+ * @throws IllegalArgumentException if there are errors when serializing
+ */
+ public static byte[] serializeToCompressedByteArray(Object value) {
+ try {
+ ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+ try (ObjectOutputStream oos = new ObjectOutputStream(new SnappyOutputStream(buffer))) {
+ oos.writeObject(value);
+ }
+ return buffer.toByteArray();
+ } catch (IOException exn) {
+ throw new IllegalArgumentException(
+ "unable to serialize " + value,
+ exn);
+ }
+ }
+
+ /**
+ * Serializes the argument into an array of bytes, and returns it.
+ *
+ * @throws IllegalArgumentException if there are errors when serializing
+ */
+ public static byte[] serializeToByteArray(Object value) {
+ try {
+ ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+ try (ObjectOutputStream oos = new ObjectOutputStream(buffer)) {
+ oos.writeObject(value);
+ }
+ return buffer.toByteArray();
+ } catch (IOException exn) {
+ throw new IllegalArgumentException("unable to serialize " + value, exn);
+ }
+ }
+
+ /**
+ * Deserializes an object from the given array of bytes, e.g., as
+ * serialized using {@link #serializeToCompressedByteArray}, and returns it.
+ *
+ * @throws IllegalArgumentException if there are errors when
+ * deserializing, using the provided description to identify what
+ * was being deserialized
+ */
+ public static Object deserializeFromByteArray(byte[] encodedValue,
+ String description) {
+ try {
+ try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(encodedValue))) {
+ return ois.readObject();
+ }
+ } catch (IOException | ClassNotFoundException exn) {
+ throw new IllegalArgumentException(
+ "unable to deserialize " + description,
+ exn);
+ }
+ }
+
+ /**
+ * Deserializes an object from the given array of bytes, e.g., as
+ * serialized using {@link #serializeToCompressedByteArray}, and returns it.
+ *
+ * @throws IllegalArgumentException if there are errors when
+ * deserializing, using the provided description to identify what
+ * was being deserialized
+ */
+ public static Object deserializeFromCompressedByteArray(byte[] encodedValue,
+ String description) {
+ try {
+ try (ObjectInputStream ois = new ObjectInputStream(
+ new SnappyInputStream(new ByteArrayInputStream(encodedValue)))) {
+ return ois.readObject();
+ }
+ } catch (IOException | ClassNotFoundException exn) {
+ throw new IllegalArgumentException(
+ "unable to deserialize " + description,
+ exn);
+ }
+ }
+
+ public static <T extends Serializable> T ensureSerializable(T value) {
+ @SuppressWarnings("unchecked")
+ T copy = (T) deserializeFromCompressedByteArray(serializeToCompressedByteArray(value),
+ value.toString());
+ return copy;
+ }
+
+ public static <T extends Serializable> T clone(T value) {
+ @SuppressWarnings("unchecked")
+ T copy = (T) deserializeFromCompressedByteArray(serializeToCompressedByteArray(value),
+ value.toString());
+ return copy;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/DefaultJavaObjctSerDeser.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/DefaultJavaObjctSerDeser.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/DefaultJavaObjctSerDeser.java
new file mode 100644
index 0000000..24385a9
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/DefaultJavaObjctSerDeser.java
@@ -0,0 +1,41 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.log.entity.meta;
+
+import org.apache.eagle.common.SerializableUtils;
+
+public class DefaultJavaObjctSerDeser implements EntitySerDeser<Object> {
+ public final static EntitySerDeser<Object> INSTANCE = new DefaultJavaObjctSerDeser();
+
+ @Override
+ public Object deserialize(byte[] bytes) {
+ return SerializableUtils.deserializeFromByteArray(bytes,"Deserialize from java object bytes");
+ }
+
+ @Override
+ public byte[] serialize(Object o) {
+ return SerializableUtils.serializeToByteArray(o);
+ }
+
+ @Override
+ public Class<Object> type() {
+ return Object.class;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntityDefinitionManager.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntityDefinitionManager.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntityDefinitionManager.java
index d990fb5..7b1010d 100755
--- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntityDefinitionManager.java
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntityDefinitionManager.java
@@ -16,12 +16,6 @@
*/
package org.apache.eagle.log.entity.meta;
-import java.lang.reflect.Method;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
import org.apache.eagle.common.config.EagleConfigFactory;
import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
import org.apache.eagle.log.entity.repo.EntityRepositoryScanner;
@@ -31,6 +25,12 @@ import org.mockito.cglib.core.Predicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
/**
* static initialization of all registered entities. As of now, dynamic registration is not supported
*/
@@ -365,11 +365,12 @@ public class EntityDefinitionManager {
q.setQualifierName(column.value());
EntitySerDeser<?> serDeser = _serDeserMap.get(fldCls);
if(serDeser == null){
- throw new IllegalArgumentException(fldCls.getName() + " in field " + f.getName() +
- " of entity " + cls.getSimpleName() + " has no serializer associated ");
- } else {
- q.setSerDeser((EntitySerDeser<Object>)serDeser);
+// throw new IllegalArgumentException(fldCls.getName() + " in field " + f.getName() +
+// " of entity " + cls.getSimpleName() + " has no serializer associated ");
+ serDeser = DefaultJavaObjctSerDeser.INSTANCE;
}
+
+ q.setSerDeser((EntitySerDeser<Object>)serDeser);
ed.getQualifierNameMap().put(q.getQualifierName(), q);
ed.getDisplayNameMap().put(q.getDisplayName(), q);
// TODO: should refine rules, consider fields like "hCol", getter method should be gethCol() according to org.apache.commons.beanutils.PropertyUtils
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpa-spark-history/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpa-spark-history/pom.xml b/eagle-jpm/eagle-jpa-spark-history/pom.xml
new file mode 100644
index 0000000..cc293b6
--- /dev/null
+++ b/eagle-jpm/eagle-jpa-spark-history/pom.xml
@@ -0,0 +1,66 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.eagle</groupId>
+ <artifactId>eagle-jpm-parent</artifactId>
+ <version>0.3.0-incubating</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+ <artifactId>eagle-jpm-spark-history</artifactId>
+ <name>eagle-jpm-spark-history</name>
+ <url>http://maven.apache.org</url>
+ <dependencies>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.eagle</groupId>
+ <artifactId>eagle-stream-process-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.eagle</groupId>
+ <artifactId>eagle-stream-process-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.eagle</groupId>
+ <artifactId>eagle-job-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.jsoup</groupId>
+ <artifactId>jsoup</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpa-spark-running/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpa-spark-running/pom.xml b/eagle-jpm/eagle-jpa-spark-running/pom.xml
new file mode 100644
index 0000000..42c476a
--- /dev/null
+++ b/eagle-jpm/eagle-jpa-spark-running/pom.xml
@@ -0,0 +1,66 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.eagle</groupId>
+ <artifactId>eagle-jpm-parent</artifactId>
+ <version>0.3.0-incubating</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+ <artifactId>eagle-jpm-spark-running</artifactId>
+ <name>eagle-jpm-spark-running</name>
+ <url>http://maven.apache.org</url>
+ <dependencies>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.eagle</groupId>
+ <artifactId>eagle-stream-process-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.eagle</groupId>
+ <artifactId>eagle-stream-process-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.eagle</groupId>
+ <artifactId>eagle-job-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.jsoup</groupId>
+ <artifactId>jsoup</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-entity/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/pom.xml b/eagle-jpm/eagle-jpm-entity/pom.xml
new file mode 100644
index 0000000..29be4ab
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-entity/pom.xml
@@ -0,0 +1,52 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>eagle-jpm-parent</artifactId>
+ <groupId>org.apache.eagle</groupId>
+ <version>0.5.0-incubating-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>eagle-jpm-entity</artifactId>
+ <packaging>jar</packaging>
+
+ <name>eagle-jpm-entity</name>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.eagle</groupId>
+ <artifactId>eagle-jpm-util</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>3.8.1</version>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/JPMEntityRepository.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/JPMEntityRepository.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/JPMEntityRepository.java
new file mode 100644
index 0000000..f54688b
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/JPMEntityRepository.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.entity;
+
+import org.apache.eagle.log.entity.repo.EntityRepository;
+
+public class JPMEntityRepository extends EntityRepository {
+ public JPMEntityRepository() {
+ entitySet.add(SparkApp.class);
+ entitySet.add(SparkJob.class);
+ entitySet.add(SparkStage.class);
+ entitySet.add(SparkTask.class);
+ entitySet.add(SparkExecutor.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/JobConfig.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/JobConfig.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/JobConfig.java
new file mode 100644
index 0000000..de3bd7a
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/JobConfig.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.entity;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.TreeMap;
+
+public class JobConfig implements Serializable {
+ private Map<String, String> config = new TreeMap<>();
+
+ public Map<String, String> getConfig() {
+ return config;
+ }
+
+ public void setConfig(Map<String, String> config) {
+ this.config = config;
+ }
+ @Override
+ public String toString(){
+ return config.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/SparkApp.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/SparkApp.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/SparkApp.java
new file mode 100644
index 0000000..1760753
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/SparkApp.java
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.entity;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.*;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.apache.eagle.jpm.util.Constants;
+
+@Table("eglesprk_apps")
+@ColumnFamily("f")
+@Prefix("sprkapp")
+@Service(Constants.SPARK_APP_SERVICE_ENDPOINT_NAME)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@TimeSeries(true)
+@Tags({"site","sprkAppId", "sprkAppAttemptId", "sprkAppName", "normSprkAppName","user", "queue"})
+@Partition({"site"})
+public class SparkApp extends TaggedLogAPIEntity{
+
+ @Column("a")
+ private long startTime;
+ @Column("b")
+ private long endTime;
+ @Column("c")
+ private String yarnState;
+ @Column("d")
+ private String yarnStatus;
+ @Column("e")
+ private JobConfig config;
+ @Column("f")
+ private int numJobs;
+ @Column("g")
+ private int totalStages;
+ @Column("h")
+ private int skippedStages;
+ @Column("i")
+ private int failedStages;
+ @Column("j")
+ private int totalTasks;
+ @Column("k")
+ private int skippedTasks;
+ @Column("l")
+ private int failedTasks;
+ @Column("m")
+ private int executors;
+ @Column("n")
+ private long inputBytes;
+ @Column("o")
+ private long inputRecords;
+ @Column("p")
+ private long outputBytes;
+ @Column("q")
+ private long outputRecords;
+ @Column("r")
+ private long shuffleReadBytes;
+ @Column("s")
+ private long shuffleReadRecords;
+ @Column("t")
+ private long shuffleWriteBytes;
+ @Column("u")
+ private long shuffleWriteRecords;
+ @Column("v")
+ private long executorDeserializeTime;
+ @Column("w")
+ private long executorRunTime;
+ @Column("x")
+ private long resultSize;
+ @Column("y")
+ private long jvmGcTime;
+ @Column("z")
+ private long resultSerializationTime;
+ @Column("ab")
+ private long memoryBytesSpilled;
+ @Column("ac")
+ private long diskBytesSpilled;
+ @Column("ad")
+ private long execMemoryBytes;
+ @Column("ae")
+ private long driveMemoryBytes;
+ @Column("af")
+ private int completeTasks;
+ @Column("ag")
+ private long totalExecutorTime;
+ @Column("ah")
+ private long executorMemoryOverhead;
+ @Column("ai")
+ private long driverMemoryOverhead;
+ @Column("aj")
+ private int executorCores;
+ @Column("ak")
+ private int driverCores;
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public long getEndTime() {
+ return endTime;
+ }
+
+ public String getYarnState() {
+ return yarnState;
+ }
+
+ public String getYarnStatus() {
+ return yarnStatus;
+ }
+
+ public int getNumJobs() {
+ return numJobs;
+ }
+
+ public int getTotalStages() {
+ return totalStages;
+ }
+
+ public int getSkippedStages() {
+ return skippedStages;
+ }
+
+ public int getFailedStages() {
+ return failedStages;
+ }
+
+ public int getTotalTasks() {
+ return totalTasks;
+ }
+
+ public int getSkippedTasks() {
+ return skippedTasks;
+ }
+
+ public int getFailedTasks() {
+ return failedTasks;
+ }
+
+ public int getExecutors() {
+ return executors;
+ }
+
+ public long getInputBytes() {
+ return inputBytes;
+ }
+
+ public long getInputRecords() {
+ return inputRecords;
+ }
+
+ public long getOutputBytes() {
+ return outputBytes;
+ }
+
+ public long getOutputRecords() {
+ return outputRecords;
+ }
+
+ public long getShuffleReadBytes() {
+ return shuffleReadBytes;
+ }
+
+ public long getShuffleReadRecords() {
+ return shuffleReadRecords;
+ }
+
+ public long getShuffleWriteBytes() {
+ return shuffleWriteBytes;
+ }
+
+ public long getShuffleWriteRecords() {
+ return shuffleWriteRecords;
+ }
+
+ public long getExecutorDeserializeTime() {
+ return executorDeserializeTime;
+ }
+
+ public long getExecutorRunTime() {
+ return executorRunTime;
+ }
+
+ public long getResultSize() {
+ return resultSize;
+ }
+
+ public long getJvmGcTime() {
+ return jvmGcTime;
+ }
+
+ public long getResultSerializationTime() {
+ return resultSerializationTime;
+ }
+
+ public long getMemoryBytesSpilled() {
+ return memoryBytesSpilled;
+ }
+
+ public long getDiskBytesSpilled() {
+ return diskBytesSpilled;
+ }
+
+ public long getExecMemoryBytes() {
+ return execMemoryBytes;
+ }
+
+ public long getDriveMemoryBytes() {
+ return driveMemoryBytes;
+ }
+
+ public int getCompleteTasks(){ return completeTasks;}
+
+ public JobConfig getConfig() {
+ return config;
+ }
+ public void setStartTime(long startTime) {
+ this.startTime = startTime;
+ valueChanged("startTime");
+ }
+
+ public void setEndTime(long endTime) {
+ this.endTime = endTime;
+ valueChanged("endTime");
+ }
+
+ public void setYarnState(String yarnState) {
+ this.yarnState = yarnState;
+ valueChanged("yarnState");
+ }
+
+ public void setYarnStatus(String yarnStatus) {
+ this.yarnStatus = yarnStatus;
+ valueChanged("yarnStatus");
+ }
+
+ public void setConfig(JobConfig config) {
+ this.config = config;
+ valueChanged("config");
+ }
+
+ public void setNumJobs(int numJobs) {
+ this.numJobs = numJobs;
+ valueChanged("numJobs");
+ }
+
+ public void setTotalStages(int totalStages) {
+ this.totalStages = totalStages;
+ valueChanged("totalStages");
+ }
+
+ public void setSkippedStages(int skippedStages) {
+ this.skippedStages = skippedStages;
+ valueChanged("skippedStages");
+ }
+
+ public void setFailedStages(int failedStages) {
+ this.failedStages = failedStages;
+ valueChanged("failedStages");
+ }
+
+ public void setTotalTasks(int totalTasks) {
+ this.totalTasks = totalTasks;
+ valueChanged("totalTasks");
+ }
+
+ public void setSkippedTasks(int skippedTasks) {
+ this.skippedTasks = skippedTasks;
+ valueChanged("skippedTasks");
+ }
+
+ public void setFailedTasks(int failedTasks) {
+ this.failedTasks = failedTasks;
+ valueChanged("failedTasks");
+ }
+
+ public void setExecutors(int executors) {
+ this.executors = executors;
+ valueChanged("executors");
+ }
+
+ public void setInputBytes(long inputBytes) {
+ this.inputBytes = inputBytes;
+ valueChanged("inputBytes");
+ }
+
+ public void setInputRecords(long inputRecords) {
+ this.inputRecords = inputRecords;
+ valueChanged("inputRecords");
+ }
+
+ public void setOutputBytes(long outputBytes) {
+ this.outputBytes = outputBytes;
+ valueChanged("outputBytes");
+ }
+
+ public void setOutputRecords(long outputRecords) {
+ this.outputRecords = outputRecords;
+ valueChanged("outputRecords");
+ }
+
+ public void setShuffleReadBytes(long shuffleReadRemoteBytes) {
+ this.shuffleReadBytes = shuffleReadRemoteBytes;
+ valueChanged("shuffleReadBytes");
+ }
+
+ public void setShuffleReadRecords(long shuffleReadRecords) {
+ this.shuffleReadRecords = shuffleReadRecords;
+ valueChanged("shuffleReadRecords");
+ }
+
+ public void setShuffleWriteBytes(long shuffleWriteBytes) {
+ this.shuffleWriteBytes = shuffleWriteBytes;
+ valueChanged("shuffleWriteBytes");
+ }
+
+ public void setShuffleWriteRecords(long shuffleWriteRecords) {
+ this.shuffleWriteRecords = shuffleWriteRecords;
+ valueChanged("shuffleWriteRecords");
+ }
+
+ public void setExecutorDeserializeTime(long executorDeserializeTime) {
+ this.executorDeserializeTime = executorDeserializeTime;
+ valueChanged("executorDeserializeTime");
+ }
+
+ public void setExecutorRunTime(long executorRunTime) {
+ this.executorRunTime = executorRunTime;
+ valueChanged("executorRunTime");
+ }
+
+ public void setResultSize(long resultSize) {
+ this.resultSize = resultSize;
+ valueChanged("resultSize");
+ }
+
+ public void setJvmGcTime(long jvmGcTime) {
+ this.jvmGcTime = jvmGcTime;
+ valueChanged("jvmGcTime");
+ }
+
+ public void setResultSerializationTime(long resultSerializationTime) {
+ this.resultSerializationTime = resultSerializationTime;
+ valueChanged("resultSerializationTime");
+ }
+
+ public void setMemoryBytesSpilled(long memoryBytesSpilled) {
+ this.memoryBytesSpilled = memoryBytesSpilled;
+ valueChanged("memoryBytesSpilled");
+ }
+
+ public void setDiskBytesSpilled(long diskBytesSpilled) {
+ this.diskBytesSpilled = diskBytesSpilled;
+ valueChanged("diskBytesSpilled");
+ }
+
+ public void setExecMemoryBytes(long execMemoryBytes) {
+ this.execMemoryBytes = execMemoryBytes;
+ valueChanged("execMemoryBytes");
+ }
+
+ public void setDriveMemoryBytes(long driveMemoryBytes) {
+ this.driveMemoryBytes = driveMemoryBytes;
+ valueChanged("driveMemoryBytes");
+ }
+
+ public void setCompleteTasks(int completeTasks){
+ this.completeTasks = completeTasks;
+ valueChanged("completeTasks");
+ }
+
+ public long getTotalExecutorTime() {
+ return totalExecutorTime;
+ }
+
+ public void setTotalExecutorTime(long totalExecutorTime) {
+ this.totalExecutorTime = totalExecutorTime;
+ valueChanged("totalExecutorTime");
+ }
+
+ public long getExecutorMemoryOverhead() {
+ return executorMemoryOverhead;
+ }
+
+ public void setExecutorMemoryOverhead(long executorMemoryOverhead) {
+ this.executorMemoryOverhead = executorMemoryOverhead;
+ valueChanged("executorMemoryOverhead");
+ }
+
+ public long getDriverMemoryOverhead() {
+ return driverMemoryOverhead;
+ }
+
+ public void setDriverMemoryOverhead(long driverMemoryOverhead) {
+ this.driverMemoryOverhead = driverMemoryOverhead;
+ valueChanged("driverMemoryOverhead");
+ }
+
+ public int getExecutorCores() {
+ return executorCores;
+ }
+
+ public void setExecutorCores(int executorCores) {
+ this.executorCores = executorCores;
+ valueChanged("executorCores");
+ }
+
+ public int getDriverCores() {
+ return driverCores;
+ }
+
+ public void setDriverCores(int driverCores) {
+ this.driverCores = driverCores;
+ valueChanged("driverCores");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/SparkExecutor.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/SparkExecutor.java
new file mode 100644
index 0000000..92cb130
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/SparkExecutor.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.entity;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.*;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.apache.eagle.jpm.util.Constants;
+
+@Table("eglesprk_executors")
+@ColumnFamily("f")
+@Prefix("sprkexcutr")
+@Service(Constants.SPARK_EXECUTOR_SERVICE_ENDPOINT_NAME)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@TimeSeries(true)
+@Tags({"site","sprkAppId", "sprkAppAttemptId", "sprkAppName", "normSprkAppName", "executorId","user", "queue"})
+@Partition({"site"})
+public class SparkExecutor extends TaggedLogAPIEntity{
+
+ @Column("a")
+ private String hostPort;
+ @Column("b")
+ private int rddBlocks;
+ @Column("c")
+ private long memoryUsed;
+ @Column("d")
+ private long diskUsed;
+ @Column("e")
+ private int activeTasks = 0;
+ @Column("f")
+ private int failedTasks = 0;
+ @Column("g")
+ private int completedTasks = 0;
+ @Column("h")
+ private int totalTasks = 0;
+ @Column("i")
+ private long totalDuration = 0;
+ @Column("j")
+ private long totalInputBytes = 0;
+ @Column("k")
+ private long totalShuffleRead = 0;
+ @Column("l")
+ private long totalShuffleWrite = 0;
+ @Column("m")
+ private long maxMemory;
+ @Column("n")
+ private long startTime;
+ @Column("o")
+ private long endTime = 0;
+ @Column("p")
+ private long execMemoryBytes;
+ @Column("q")
+ private int cores;
+ @Column("r")
+ private long memoryOverhead;
+
+ public String getHostPort() {
+ return hostPort;
+ }
+
+ public void setHostPort(String hostPort) {
+ this.hostPort = hostPort;
+ this.valueChanged("hostPort");
+ }
+
+ public int getRddBlocks() {
+ return rddBlocks;
+ }
+
+ public void setRddBlocks(int rddBlocks) {
+ this.rddBlocks = rddBlocks;
+ this.valueChanged("rddBlocks");
+ }
+
+ public long getMemoryUsed() {
+ return memoryUsed;
+ }
+
+ public void setMemoryUsed(long memoryUsed) {
+ this.memoryUsed = memoryUsed;
+ this.valueChanged("memoryUsed");
+ }
+
+ public long getDiskUsed() {
+ return diskUsed;
+ }
+
+ public void setDiskUsed(long diskUsed) {
+ this.diskUsed = diskUsed;
+ this.valueChanged("diskUsed");
+ }
+
+ public int getActiveTasks() {
+ return activeTasks;
+ }
+
+ public void setActiveTasks(int activeTasks) {
+ this.activeTasks = activeTasks;
+ this.valueChanged("activeTasks");
+ }
+
+ public int getFailedTasks() {
+ return failedTasks;
+ }
+
+ public void setFailedTasks(int failedTasks) {
+ this.failedTasks = failedTasks;
+ this.valueChanged("failedTasks");
+ }
+
+ public int getCompletedTasks() {
+ return completedTasks;
+ }
+
+ public void setCompletedTasks(int completedTasks) {
+ this.completedTasks = completedTasks;
+ this.valueChanged("completedTasks");
+ }
+
+ public int getTotalTasks() {
+ return totalTasks;
+ }
+
+ public void setTotalTasks(int totalTasks) {
+ this.totalTasks = totalTasks;
+ this.valueChanged("totalTasks");
+ }
+
+ public long getTotalDuration() {
+ return totalDuration;
+ }
+
+ public void setTotalDuration(long totalDuration) {
+ this.totalDuration = totalDuration;
+ this.valueChanged("totalDuration");
+ }
+
+ public long getTotalInputBytes() {
+ return totalInputBytes;
+ }
+
+ public void setTotalInputBytes(long totalInputBytes) {
+ this.totalInputBytes = totalInputBytes;
+ this.valueChanged("totalInputBytes");
+ }
+
+ public long getTotalShuffleRead() {
+ return totalShuffleRead;
+ }
+
+ public void setTotalShuffleRead(long totalShuffleRead) {
+ this.totalShuffleRead = totalShuffleRead;
+ this.valueChanged("totalShuffleRead");
+ }
+
+ public long getTotalShuffleWrite() {
+ return totalShuffleWrite;
+ }
+
+ public void setTotalShuffleWrite(long totalShuffleWrite) {
+ this.totalShuffleWrite = totalShuffleWrite;
+ this.valueChanged("totalShuffleWrite");
+ }
+
+ public long getMaxMemory() {
+ return maxMemory;
+ }
+
+ public void setMaxMemory(long maxMemory) {
+ this.maxMemory = maxMemory;
+ this.valueChanged("maxMemory");
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public void setStartTime(long startTime) {
+ this.startTime = startTime;
+ valueChanged("startTime");
+ }
+
+ public long getEndTime() {
+ return endTime;
+ }
+
+ public void setEndTime(long endTime) {
+ this.endTime = endTime;
+ this.valueChanged("endTime");
+ }
+
+ public long getExecMemoryBytes() {
+ return execMemoryBytes;
+ }
+
+ public void setExecMemoryBytes(long execMemoryBytes) {
+ this.execMemoryBytes = execMemoryBytes;
+ this.valueChanged("execMemoryBytes");
+ }
+
+ public int getCores() {
+ return cores;
+ }
+
+ public void setCores(int cores) {
+ this.cores = cores;
+ valueChanged("cores");
+ }
+
+ public long getMemoryOverhead() {
+ return memoryOverhead;
+ }
+
+ public void setMemoryOverhead(long memoryOverhead) {
+ this.memoryOverhead = memoryOverhead;
+ valueChanged("memoryOverhead");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/SparkJob.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/SparkJob.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/SparkJob.java
new file mode 100644
index 0000000..a641440
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/SparkJob.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.entity;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.*;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.apache.eagle.jpm.util.Constants;
+
+@Table("eglesprk_jobs")
+@ColumnFamily("f")
+@Prefix("sprkjob")
+@Service(Constants.SPARK_JOB_SERVICE_ENDPOINT_NAME)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@TimeSeries(true)
+@Tags({"site","sprkAppId", "sprkAppAttemptId", "sprkAppName", "normSprkAppName", "jobId","user", "queue"})
+@Partition({"site"})
+public class SparkJob extends TaggedLogAPIEntity{
+
+ @Column("a")
+ private long submissionTime;
+ @Column("b")
+ private long completionTime;
+ @Column("c")
+ private int numStages=0;
+ @Column("d")
+ private String status;
+ @Column("e")
+ private int numTask=0;
+ @Column("f")
+ private int numActiveTasks=0;
+ @Column("g")
+ private int numCompletedTasks=0;
+ @Column("h")
+ private int numSkippedTasks=0;
+ @Column("i")
+ private int numFailedTasks=0;
+ @Column("j")
+ private int numActiveStages=0;
+ @Column("k")
+ private int numCompletedStages=0;
+ @Column("l")
+ private int numSkippedStages=0;
+ @Column("m")
+ private int numFailedStages=0;
+
+ public long getSubmissionTime() {
+ return submissionTime;
+ }
+
+ public long getCompletionTime() {
+ return completionTime;
+ }
+
+ public int getNumStages() {
+ return numStages;
+ }
+
+ public String getStatus() {
+ return status;
+ }
+
+ public int getNumTask() {
+ return numTask;
+ }
+
+ public int getNumActiveTasks() {
+ return numActiveTasks;
+ }
+
+ public int getNumCompletedTasks() {
+ return numCompletedTasks;
+ }
+
+ public int getNumSkippedTasks() {
+ return numSkippedTasks;
+ }
+
+ public int getNumFailedTasks() {
+ return numFailedTasks;
+ }
+
+ public int getNumActiveStages() {
+ return numActiveStages;
+ }
+
+ public int getNumCompletedStages() {
+ return numCompletedStages;
+ }
+
+ public int getNumSkippedStages() {
+ return numSkippedStages;
+ }
+
+ public int getNumFailedStages() {
+ return numFailedStages;
+ }
+
+ public void setSubmissionTime(long submissionTime) {
+ this.submissionTime = submissionTime;
+ this.valueChanged("submissionTime");
+ }
+
+ public void setCompletionTime(long completionTime) {
+ this.completionTime = completionTime;
+ this.valueChanged("completionTime");
+ }
+
+ public void setNumStages(int numStages) {
+ this.numStages = numStages;
+ this.valueChanged("numStages");
+ }
+
+ public void setStatus(String status) {
+ this.status = status;
+ this.valueChanged("status");
+ }
+
+ public void setNumTask(int numTask) {
+ this.numTask = numTask;
+ this.valueChanged("numTask");
+ }
+
+ public void setNumActiveTasks(int numActiveTasks) {
+ this.numActiveTasks = numActiveTasks;
+ this.valueChanged("numActiveTasks");
+ }
+
+ public void setNumCompletedTasks(int numCompletedTasks) {
+ this.numCompletedTasks = numCompletedTasks;
+ this.valueChanged("numCompletedTasks");
+ }
+
+ public void setNumSkippedTasks(int numSkippedTasks) {
+ this.numSkippedTasks = numSkippedTasks;
+ this.valueChanged("numSkippedTasks");
+ }
+
+ public void setNumFailedTasks(int numFailedTasks) {
+ this.numFailedTasks = numFailedTasks;
+ this.valueChanged("numFailedTasks");
+ }
+
+ public void setNumActiveStages(int numActiveStages) {
+ this.numActiveStages = numActiveStages;
+ this.valueChanged("numActiveStages");
+ }
+
+ public void setNumCompletedStages(int numCompletedStages) {
+ this.numCompletedStages = numCompletedStages;
+ this.valueChanged("numCompletedStages");
+ }
+
+ public void setNumSkippedStages(int numSkippedStages) {
+ this.numSkippedStages = numSkippedStages;
+ this.valueChanged("numSkippedStages");
+ }
+
+ public void setNumFailedStages(int numFailedStages) {
+ this.numFailedStages = numFailedStages;
+ this.valueChanged("numFailedStages");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/SparkStage.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/SparkStage.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/SparkStage.java
new file mode 100644
index 0000000..92714bf
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/SparkStage.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.entity;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.*;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.apache.eagle.jpm.util.Constants;
+
+@Table("eglesprk_stages")
+@ColumnFamily("f")
+@Prefix("sprkstage")
+@Service(Constants.SPARK_STAGE_SERVICE_ENDPOINT_NAME)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@TimeSeries(true)
+@Tags({"site","sprkAppId", "sprkAppAttemptId", "sprkAppName", "normSprkAppName", "jobId", "stageId","stageAttemptId","user", "queue"})
+@Partition({"site"})
+public class SparkStage extends TaggedLogAPIEntity{
+
+ @Column("a")
+ private String status;
+ @Column("b")
+ private int numActiveTasks=0;
+ @Column("c")
+ private int numCompletedTasks=0;
+ @Column("d")
+ private int numFailedTasks=0;
+ @Column("e")
+ private long executorRunTime=0l;
+ @Column("f")
+ private long inputBytes=0l;
+ @Column("g")
+ private long inputRecords=0l;
+ @Column("h")
+ private long outputBytes=0l;
+ @Column("i")
+ private long outputRecords=0l;
+ @Column("j")
+ private long shuffleReadBytes=0l;
+ @Column("k")
+ private long shuffleReadRecords=0l;
+ @Column("l")
+ private long shuffleWriteBytes=0l;
+ @Column("m")
+ private long shuffleWriteRecords=0l;
+ @Column("n")
+ private long memoryBytesSpilled=0l;
+ @Column("o")
+ private long diskBytesSpilled=0l;
+ @Column("p")
+ private String name;
+ @Column("q")
+ private String schedulingPool;
+ @Column("r")
+ private long submitTime;
+ @Column("s")
+ private long completeTime;
+ @Column("t")
+ private int numTasks;
+ @Column("u")
+ private long executorDeserializeTime;
+ @Column("v")
+ private long resultSize;
+ @Column("w")
+ private long jvmGcTime;
+ @Column("x")
+ private long resultSerializationTime;
+
+ public String getStatus() {
+ return status;
+ }
+
+ public int getNumActiveTasks() {
+ return numActiveTasks;
+ }
+
+ public int getNumCompletedTasks() {
+ return numCompletedTasks;
+ }
+
+ public int getNumFailedTasks() {
+ return numFailedTasks;
+ }
+
+ public long getExecutorRunTime() {
+ return executorRunTime;
+ }
+
+ public long getInputBytes() {
+ return inputBytes;
+ }
+
+ public long getInputRecords() {
+ return inputRecords;
+ }
+
+ public long getOutputBytes() {
+ return outputBytes;
+ }
+
+ public long getOutputRecords() {
+ return outputRecords;
+ }
+
+ public long getShuffleReadBytes() {
+ return shuffleReadBytes;
+ }
+
+ public long getShuffleReadRecords() {
+ return shuffleReadRecords;
+ }
+
+ public long getShuffleWriteBytes() {
+ return shuffleWriteBytes;
+ }
+
+ public long getShuffleWriteRecords() {
+ return shuffleWriteRecords;
+ }
+
+ public long getMemoryBytesSpilled() {
+ return memoryBytesSpilled;
+ }
+
+ public long getDiskBytesSpilled() {
+ return diskBytesSpilled;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String getSchedulingPool() {
+ return schedulingPool;
+ }
+
+ public long getSubmitTime() {
+ return submitTime;
+ }
+
+ public long getCompleteTime() {
+ return completeTime;
+ }
+
+ public int getNumTasks() {
+ return numTasks;
+ }
+
+ public long getExecutorDeserializeTime() {
+ return executorDeserializeTime;
+ }
+
+ public long getResultSize() {
+ return resultSize;
+ }
+
+ public long getJvmGcTime() {
+ return jvmGcTime;
+ }
+
+ public long getResultSerializationTime() {
+ return resultSerializationTime;
+ }
+
+ public void setStatus(String status) {
+ this.status = status;
+ this.valueChanged("status");
+ }
+
+ public void setNumActiveTasks(int numActiveTasks) {
+ this.numActiveTasks = numActiveTasks;
+ this.valueChanged("numActiveTasks");
+ }
+
+ public void setNumCompletedTasks(int numCompletedTasks) {
+ this.numCompletedTasks = numCompletedTasks;
+ this.valueChanged("numCompletedTasks");
+ }
+
+ public void setNumFailedTasks(int numFailedTasks) {
+ this.numFailedTasks = numFailedTasks;
+ this.valueChanged("numFailedTasks");
+ }
+
+ public void setExecutorRunTime(long executorRunTime) {
+ this.executorRunTime = executorRunTime;
+ this.valueChanged("executorRunTime");
+ }
+
+ public void setInputBytes(long inputBytes) {
+ this.inputBytes = inputBytes;
+ this.valueChanged("inputBytes");
+ }
+
+ public void setInputRecords(long inputRecords) {
+ this.inputRecords = inputRecords;
+ this.valueChanged("inputRecords");
+ }
+
+ public void setOutputBytes(long outputBytes) {
+ this.outputBytes = outputBytes;
+ this.valueChanged("outputBytes");
+ }
+
+ public void setOutputRecords(long outputRecords) {
+ this.outputRecords = outputRecords;
+ this.valueChanged("outputRecords");
+ }
+
+ public void setShuffleReadBytes(long shuffleReadBytes) {
+ this.shuffleReadBytes = shuffleReadBytes;
+ this.valueChanged("shuffleReadBytes");
+ }
+
+ public void setShuffleReadRecords(long shuffleReadRecords) {
+ this.shuffleReadRecords = shuffleReadRecords;
+ this.valueChanged("shuffleReadRecords");
+ }
+
+ public void setShuffleWriteBytes(long shuffleWriteBytes) {
+ this.shuffleWriteBytes = shuffleWriteBytes;
+ this.valueChanged("shuffleWriteBytes");
+ }
+
+ public void setShuffleWriteRecords(long shuffleWriteRecords) {
+ this.shuffleWriteRecords = shuffleWriteRecords;
+ this.valueChanged("shuffleWriteRecords");
+ }
+
+ public void setMemoryBytesSpilled(long memoryBytesSpilled) {
+ this.memoryBytesSpilled = memoryBytesSpilled;
+ this.valueChanged("memoryBytesSpilled");
+ }
+
+ public void setDiskBytesSpilled(long diskBytesSpilled) {
+ this.diskBytesSpilled = diskBytesSpilled;
+ this.valueChanged("diskBytesSpilled");
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ this.valueChanged("name");
+ }
+
+ public void setSchedulingPool(String schedulingPool) {
+ this.schedulingPool = schedulingPool;
+ this.valueChanged("schedulingPool");
+ }
+
+ public void setSubmitTime(long submitTime) {
+ this.submitTime = submitTime;
+ this.valueChanged("submitTime");
+ }
+
+ public void setCompleteTime(long completeTime) {
+ this.completeTime = completeTime;
+ this.valueChanged("completeTime");
+ }
+
+ public void setNumTasks(int numTasks) {
+ this.numTasks = numTasks;
+ valueChanged("numTasks");
+ }
+
+ public void setExecutorDeserializeTime(long executorDeserializeTime) {
+ this.executorDeserializeTime = executorDeserializeTime;
+ valueChanged("executorDeserializeTime");
+ }
+
+ public void setResultSize(long resultSize) {
+ this.resultSize = resultSize;
+ valueChanged("resultSize");
+ }
+
+ public void setJvmGcTime(long jvmGcTime) {
+ this.jvmGcTime = jvmGcTime;
+ valueChanged("jvmGcTime");
+ }
+
+ public void setResultSerializationTime(long resultSerializationTime) {
+ this.resultSerializationTime = resultSerializationTime;
+ valueChanged("resultSerializationTime");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/SparkTask.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/SparkTask.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/SparkTask.java
new file mode 100644
index 0000000..af9ed21
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/SparkTask.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.entity;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.*;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.apache.eagle.jpm.util.Constants;
+
+@Table("eglesprk_tasks")
+@ColumnFamily("f")
+@Prefix("sprktask")
+@Service(Constants.SPARK_TASK_SERVICE_ENDPOINT_NAME)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@TimeSeries(true)
+@Tags({"site","sprkAppId", "sprkAppAttemptId", "sprkAppName", "normSprkAppName", "jobId", "jobName", "stageId","stageAttemptId","taskIndex","taskAttemptId","user", "queue"})
+@Partition({"site"})
+public class SparkTask extends TaggedLogAPIEntity{
+
+ @Column("a")
+ private int taskId;
+ @Column("b")
+ private long launchTime;
+ @Column("c")
+ private String executorId;
+ @Column("d")
+ private String host;
+ @Column("e")
+ private String taskLocality;
+ @Column("f")
+ private boolean speculative;
+ @Column("g")
+ private long executorDeserializeTime;
+ @Column("h")
+ private long executorRunTime;
+ @Column("i")
+ private long resultSize;
+ @Column("j")
+ private long jvmGcTime;
+ @Column("k")
+ private long resultSerializationTime;
+ @Column("l")
+ private long memoryBytesSpilled;
+ @Column("m")
+ private long diskBytesSpilled;
+ @Column("n")
+ private long inputBytes;
+ @Column("o")
+ private long inputRecords;
+ @Column("p")
+ private long outputBytes;
+ @Column("q")
+ private long outputRecords;
+ @Column("r")
+ private long shuffleReadRemoteBytes;
+ @Column("x")
+ private long shuffleReadLocalBytes;
+ @Column("s")
+ private long shuffleReadRecords;
+ @Column("t")
+ private long shuffleWriteBytes;
+ @Column("u")
+ private long shuffleWriteRecords;
+ @Column("v")
+ private boolean failed;
+
+ public int getTaskId() {
+ return taskId;
+ }
+
+ public long getLaunchTime() {
+ return launchTime;
+ }
+
+ public String getExecutorId() {
+ return executorId;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public String getTaskLocality() {
+ return taskLocality;
+ }
+
+ public boolean isSpeculative() {
+ return speculative;
+ }
+
+ public long getExecutorDeserializeTime() {
+ return executorDeserializeTime;
+ }
+
+ public long getExecutorRunTime() {
+ return executorRunTime;
+ }
+
+ public long getResultSize() {
+ return resultSize;
+ }
+
+ public long getJvmGcTime() {
+ return jvmGcTime;
+ }
+
+ public long getResultSerializationTime() {
+ return resultSerializationTime;
+ }
+
+ public long getMemoryBytesSpilled() {
+ return memoryBytesSpilled;
+ }
+
+ public long getDiskBytesSpilled() {
+ return diskBytesSpilled;
+ }
+
+ public long getInputBytes() {
+ return inputBytes;
+ }
+
+ public long getInputRecords() {
+ return inputRecords;
+ }
+
+ public long getOutputBytes() {
+ return outputBytes;
+ }
+
+ public long getOutputRecords() {
+ return outputRecords;
+ }
+
+ public long getShuffleReadRecords() {
+ return shuffleReadRecords;
+ }
+
+ public long getShuffleWriteBytes() {
+ return shuffleWriteBytes;
+ }
+
+ public long getShuffleWriteRecords() {
+ return shuffleWriteRecords;
+ }
+
+ public boolean isFailed() {
+ return failed;
+ }
+
+ public long getShuffleReadRemoteBytes() {
+ return shuffleReadRemoteBytes;
+ }
+
+ public long getShuffleReadLocalBytes() {
+ return shuffleReadLocalBytes;
+ }
+
+ public void setFailed(boolean failed) {
+ this.failed = failed;
+ valueChanged("failed");
+ }
+
+ public void setTaskId(int taskId) {
+ this.taskId = taskId;
+ valueChanged("taskId");
+ }
+
+ public void setLaunchTime(long launchTime) {
+ this.launchTime = launchTime;
+ valueChanged("launchTime");
+ }
+
+ public void setExecutorId(String executorId) {
+ this.executorId = executorId;
+ valueChanged("executorId");
+ }
+
+ public void setHost(String host) {
+ this.host = host;
+ this.valueChanged("host");
+ }
+
+ public void setTaskLocality(String taskLocality) {
+ this.taskLocality = taskLocality;
+ this.valueChanged("taskLocality");
+ }
+
+ public void setSpeculative(boolean speculative) {
+ this.speculative = speculative;
+ this.valueChanged("speculative");
+ }
+
+ public void setExecutorDeserializeTime(long executorDeserializeTime) {
+ this.executorDeserializeTime = executorDeserializeTime;
+ this.valueChanged("executorDeserializeTime");
+ }
+
+ public void setExecutorRunTime(long executorRunTime) {
+ this.executorRunTime = executorRunTime;
+ this.valueChanged("executorRunTime");
+ }
+
+ public void setResultSize(long resultSize) {
+ this.resultSize = resultSize;
+ this.valueChanged("resultSize");
+ }
+
+ public void setJvmGcTime(long jvmGcTime) {
+ this.jvmGcTime = jvmGcTime;
+ this.valueChanged("jvmGcTime");
+ }
+
+ public void setResultSerializationTime(long resultSerializationTime) {
+ this.resultSerializationTime = resultSerializationTime;
+ this.valueChanged("resultSerializationTime");
+ }
+
+ public void setMemoryBytesSpilled(long memoryBytesSpilled) {
+ this.memoryBytesSpilled = memoryBytesSpilled;
+ this.valueChanged("memoryBytesSpilled");
+ }
+
+ public void setDiskBytesSpilled(long diskBytesSpilled) {
+ this.diskBytesSpilled = diskBytesSpilled;
+ this.valueChanged("diskBytesSpilled");
+ }
+
+ public void setInputBytes(long inputBytes) {
+ this.inputBytes = inputBytes;
+ this.valueChanged("inputBytes");
+ }
+
+ public void setInputRecords(long inputRecords) {
+ this.inputRecords = inputRecords;
+ this.valueChanged("inputRecords");
+ }
+
+ public void setOutputBytes(long outputBytes) {
+ this.outputBytes = outputBytes;
+ this.valueChanged("outputBytes");
+ }
+
+ public void setOutputRecords(long outputRecords) {
+ this.outputRecords = outputRecords;
+ this.valueChanged("outputRecords");
+ }
+
+
+
+ public void setShuffleReadRecords(long shuffleReadRecords) {
+ this.shuffleReadRecords = shuffleReadRecords;
+ this.valueChanged("shuffleReadRecords");
+ }
+
+ public void setShuffleWriteBytes(long shuffleWriteBytes) {
+ this.shuffleWriteBytes = shuffleWriteBytes;
+ this.valueChanged("shuffleWriteBytes");
+ }
+
+ public void setShuffleWriteRecords(long shuffleWriteRecords) {
+ this.shuffleWriteRecords = shuffleWriteRecords;
+ this.valueChanged("shuffleWriteRecords");
+ }
+
+ public void setShuffleReadRemoteBytes(long shuffleReadRemoteBytes) {
+ this.shuffleReadRemoteBytes = shuffleReadRemoteBytes;
+ this.valueChanged("shuffleReadRemoteBytes");
+ }
+
+ public void setShuffleReadLocalBytes(long shuffleReadLocalBytes) {
+ this.shuffleReadLocalBytes = shuffleReadLocalBytes;
+ this.valueChanged("shuffleReadLocalBytes");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/pom.xml b/eagle-jpm/eagle-jpm-mr-history/pom.xml
new file mode 100644
index 0000000..97be7ec
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/pom.xml
@@ -0,0 +1,138 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>eagle-jpm-parent</artifactId>
+ <groupId>org.apache.eagle</groupId>
+ <version>0.5.0-incubating-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>eagle-jpm-mr-history</artifactId>
+ <name>eagle-jpm-mr-history</name>
+ <url>http://maven.apache.org</url>
+ <dependencies>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.eagle</groupId>
+ <artifactId>eagle-jpm-util</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.eagle</groupId>
+ <artifactId>eagle-stream-process-api</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.wso2.orbit.com.lmax</groupId>
+ <artifactId>disruptor</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>asm</groupId>
+ <artifactId>asm</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.eagle</groupId>
+ <artifactId>eagle-stream-process-base</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.wso2.orbit.com.lmax</groupId>
+ <artifactId>disruptor</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>asm</groupId>
+ <artifactId>asm</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-recipes</artifactId>
+ <version>${curator.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-annotations</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-auth</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-app</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <resources>
+ <resource>
+ <directory>src/main/resources</directory>
+ </resource>
+ </resources>
+ <plugins>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <configuration>
+ <descriptor>src/assembly/eagle-jpm-mr-history-assembly.xml</descriptor>
+ <finalName>eagle-jpm-mr-history-${project.version}</finalName>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ <configuration>
+ <tarLongFileMode>posix</tarLongFileMode>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/assembly/eagle-jpm-mr-history-assembly.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/assembly/eagle-jpm-mr-history-assembly.xml b/eagle-jpm/eagle-jpm-mr-history/src/assembly/eagle-jpm-mr-history-assembly.xml
new file mode 100644
index 0000000..cf6d108
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/assembly/eagle-jpm-mr-history-assembly.xml
@@ -0,0 +1,65 @@
+<?xml version="1.0"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+ <id>assembly</id>
+ <formats>
+ <format>jar</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <dependencySets>
+ <dependencySet>
+ <outputDirectory>/</outputDirectory>
+ <useProjectArtifact>false</useProjectArtifact>
+ <unpack>true</unpack>
+ <scope>runtime</scope>
+ <!--includes>
+ <include>org.apache.hadoop:hadoop-common</include>
+ <include>org.apache.hadoop:hadoop-hdfs</include>
+ <include>org.apache.hadoop:hadoop-client</include>
+ <include>org.apache.hadoop:hadoop-auth</include>
+ <include>org.apache.eagle:eagle-stream-process-api</include>
+ <include>org.apache.eagle:eagle-stream-process-base</include>
+ <include>org.jsoup:jsoup</include>
+ </includes-->
+ <excludes>
+ <exclude>org.wso2.orbit.com.lmax:disruptor</exclude>
+ <exclude>asm:asm</exclude>
+ <exclude>org.apache.storm:storm-core</exclude>
+ </excludes>
+ </dependencySet>
+ </dependencySets>
+ <fileSets>
+ <fileSet>
+ <directory>${project.build.outputDirectory}/</directory>
+ <outputDirectory>/</outputDirectory>
+ <!--<includes>-->
+ <!--<include>*.conf</include>-->
+ <!--<include>*.xml</include>-->
+ <!--<include>*.properties</include>-->
+ <!--<include>*.config</include>-->
+ <!--<include>classes/META-INF/*</include>-->
+ <!--</includes>-->
+
+ <excludes>
+ <exclude>*.yaml</exclude>
+ </excludes>
+ </fileSet>
+ </fileSets>
+</assembly>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobMain.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobMain.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobMain.java
new file mode 100644
index 0000000..7c0530d
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobMain.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.StormSubmitter;
+import backtype.storm.topology.TopologyBuilder;
+import org.apache.eagle.jpm.mr.history.common.JHFConfigManager;
+import org.apache.eagle.jpm.mr.history.common.JPAConstants;
+import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter;
+import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilterBuilder;
+import org.apache.eagle.jpm.mr.history.storm.HistoryJobProgressBolt;
+import org.apache.eagle.jpm.mr.history.storm.JobHistorySpout;
+
+import java.util.List;
+import java.util.regex.Pattern;
+
+public class MRHistoryJobMain {
+ public static void main(String []args) {
+ try {
+ //1. trigger init conf
+ JHFConfigManager jhfConfigManager = JHFConfigManager.getInstance(args);
+
+ //2. init JobHistoryContentFilter
+ JobHistoryContentFilterBuilder builder = JobHistoryContentFilterBuilder.newBuilder().acceptJobFile().acceptJobConfFile();
+ List<String> confKeyPatterns = jhfConfigManager.getConfig().getStringList("MRConfigureKeys");
+ confKeyPatterns.add(JPAConstants.JobConfiguration.CASCADING_JOB);
+ confKeyPatterns.add(JPAConstants.JobConfiguration.HIVE_JOB);
+ confKeyPatterns.add(JPAConstants.JobConfiguration.PIG_JOB);
+ confKeyPatterns.add(JPAConstants.JobConfiguration.SCOOBI_JOB);
+
+ for (String key : confKeyPatterns) {
+ builder.includeJobKeyPatterns(Pattern.compile(key));
+ }
+ JobHistoryContentFilter filter = builder.build();
+
+ //3. init topology
+ TopologyBuilder topologyBuilder = new TopologyBuilder();
+ String topologyName = "mrHistoryJobTopology";
+ String spoutName = "mrHistoryJobExecutor";
+ String boltName = "updateProcessTime";
+ int parallelism = jhfConfigManager.getConfig().getInt("envContextConfig.parallelismConfig." + spoutName);
+ int tasks = jhfConfigManager.getConfig().getInt("envContextConfig.tasks." + spoutName);
+ if (parallelism > tasks) {
+ parallelism = tasks;
+ }
+ topologyBuilder.setSpout(
+ spoutName,
+ new JobHistorySpout(filter, jhfConfigManager),
+ parallelism
+ ).setNumTasks(tasks);
+ topologyBuilder.setBolt(boltName, new HistoryJobProgressBolt(spoutName, jhfConfigManager), 1).setNumTasks(1).allGrouping(spoutName);
+
+ backtype.storm.Config config = new backtype.storm.Config();
+ config.setNumWorkers(jhfConfigManager.getConfig().getInt("envContextConfig.workers"));
+ config.put(Config.TOPOLOGY_DEBUG, true);
+ if (!jhfConfigManager.getEnv().equals("local")) {
+ //cluster mode
+ //parse conf here
+ StormSubmitter.submitTopology(topologyName, config, topologyBuilder.createTopology());
+ } else {
+ //local mode
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology(topologyName, config, topologyBuilder.createTopology());
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+}