You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/08/19 04:17:10 UTC
incubator-eagle git commit: [EAGLE-458] Migrate
eagle-jpm-spark-running using appplication framework
Repository: incubator-eagle
Updated Branches:
refs/heads/develop 9432fcf91 -> acee5cb33
[EAGLE-458] Migrate eagle-jpm-spark-running using appplication framework
https://issues.apache.org/jira/browse/EAGLE-458
Author: Hao Chen <ha...@apache.org>
Closes #335 from haoch/EAGLE-458.
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/acee5cb3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/acee5cb3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/acee5cb3
Branch: refs/heads/develop
Commit: acee5cb334a266ed1f7cb215d0f8252fd5b4e067
Parents: 9432fcf
Author: Hao Chen <ha...@apache.org>
Authored: Fri Aug 19 12:16:57 2016 +0800
Committer: Hao Chen <ha...@apache.org>
Committed: Fri Aug 19 12:16:57 2016 +0800
----------------------------------------------------------------------
.../eagle/app/service/ApplicationContext.java | 20 +-
.../apache/eagle/app/test/AppJUnitRunner.java | 2 +-
.../eagle/app/test/AppTestGuiceModule.java | 42 ----
.../eagle/app/test/ApplicationSimulator.java | 70 +++++++
.../app/test/ApplicationSimulatorImpl.java | 92 +++++++++
.../eagle/app/test/ApplicationTestBase.java | 43 ++++
.../app/test/ApplicationTestGuiceModule.java | 42 ++++
.../apache/eagle/app/test/ServerSimulator.java | 70 -------
.../eagle/app/test/ServerSimulatorImpl.java | 92 ---------
.../example/ExampleApplicationProviderTest.java | 4 +-
eagle-jpm/eagle-jpm-spark-running/pom.xml | 5 +
.../jpm/spark/running/SparkRunningJobApp.java | 67 +++++++
.../spark/running/SparkRunningJobAppConfig.java | 175 +++++++++++++++++
.../running/SparkRunningJobAppProvider.java | 26 +++
.../jpm/spark/running/SparkRunningJobMain.java | 61 +-----
.../common/SparkRunningConfigManager.java | 151 --------------
.../parser/SparkAppEntityCreationHandler.java | 6 +-
.../running/parser/SparkApplicationParser.java | 10 +-
.../running/recover/SparkRunningJobManager.java | 4 +-
.../storm/SparkRunningJobFetchSpout.java | 14 +-
.../running/storm/SparkRunningJobParseBolt.java | 18 +-
...spark.running.SparkRunningJobAppProvider.xml | 195 +++++++++++++++++++
...org.apache.eagle.app.spi.ApplicationProvider | 16 ++
.../src/main/resources/application.conf | 9 +-
.../java/SparkRunningJobAppProviderTest.java | 32 +++
pom.xml | 1 +
26 files changed, 808 insertions(+), 459 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationContext.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationContext.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationContext.java
index 76ee289..91d33ca 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationContext.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationContext.java
@@ -73,15 +73,17 @@ public class ApplicationContext implements Serializable, ApplicationLifecycle {
@Override
public void onInstall() {
- List<StreamDesc> streamDescCollection = metadata.getDescriptor().getStreams().stream().map((streamDefinition -> {
- StreamSinkConfig streamSinkConfig = this.runtime.environment().streamSink().getSinkConfig(streamDefinition.getStreamId(),this.config);
- StreamDesc streamDesc = new StreamDesc();
- streamDesc.setSchema(streamDefinition);
- streamDesc.setSink(streamSinkConfig);
- streamDesc.setStreamId(streamDefinition.getStreamId());
- return streamDesc;
- })).collect(Collectors.toList());
- metadata.setStreams(streamDescCollection);
+ if(metadata.getDescriptor().getStreams()!=null) {
+ List<StreamDesc> streamDescCollection = metadata.getDescriptor().getStreams().stream().map((streamDefinition -> {
+ StreamSinkConfig streamSinkConfig = this.runtime.environment().streamSink().getSinkConfig(streamDefinition.getStreamId(), this.config);
+ StreamDesc streamDesc = new StreamDesc();
+ streamDesc.setSchema(streamDefinition);
+ streamDesc.setSink(streamSinkConfig);
+ streamDesc.setStreamId(streamDefinition.getStreamId());
+ return streamDesc;
+ })).collect(Collectors.toList());
+ metadata.setStreams(streamDescCollection);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/AppJUnitRunner.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/AppJUnitRunner.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/AppJUnitRunner.java
index 572af2c..b8174bb 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/AppJUnitRunner.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/AppJUnitRunner.java
@@ -52,7 +52,7 @@ public class AppJUnitRunner extends BlockJUnit4ClassRunner {
throws InitializationError {
final List<Module> modules = new ArrayList<>();
- AppTestGuiceModule testGuiceModule = new AppTestGuiceModule();
+ ApplicationTestGuiceModule testGuiceModule = new ApplicationTestGuiceModule();
// Add default modules
modules.add(testGuiceModule);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/AppTestGuiceModule.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/AppTestGuiceModule.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/AppTestGuiceModule.java
deleted file mode 100644
index 9b30ee4..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/AppTestGuiceModule.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.test;
-
-import com.google.inject.AbstractModule;
-import com.google.inject.Singleton;
-import org.apache.eagle.app.module.ApplicationExtensionLoader;
-import org.apache.eagle.app.module.ApplicationGuiceModule;
-import org.apache.eagle.common.module.CommonGuiceModule;
-import org.apache.eagle.common.module.GlobalScope;
-import org.apache.eagle.common.module.ModuleRegistry;
-import org.apache.eagle.metadata.service.memory.MemoryMetadataStore;
-
-public class AppTestGuiceModule extends AbstractModule{
- @Override
- protected void configure() {
- CommonGuiceModule common = new CommonGuiceModule();
- ApplicationGuiceModule app = new ApplicationGuiceModule();
- MemoryMetadataStore store = new MemoryMetadataStore();
- install(common);
- install(app);
- install(store);
- ModuleRegistry registry =ApplicationExtensionLoader.load(common,app,store);
- registry.getModules(store.getClass()).forEach(this::install);
- registry.getModules(GlobalScope.class).forEach(this::install);
- bind(ServerSimulator.class).to(ServerSimulatorImpl.class).in(Singleton.class);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulator.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulator.java
new file mode 100644
index 0000000..3e4aa21
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulator.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.test;
+
+import com.google.inject.Guice;
+import com.google.inject.Module;
+import org.apache.eagle.app.spi.ApplicationProvider;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Application test simulator for developer to quickly run application without diving into application lifecycle
+ */
+public abstract class ApplicationSimulator {
+ /**
+ *
+ * @param appType
+ */
+ public abstract void start(String appType);
+
+ /**
+ *
+ * @param appType
+ * @param appConfig
+ */
+ public abstract void start(String appType, Map<String,Object> appConfig);
+
+ /**
+ *
+ * @param appProviderClass
+ */
+ public abstract void start(Class<? extends ApplicationProvider> appProviderClass);
+
+ /**
+ *
+ * @param appProviderClass
+ * @param appConfig
+ */
+ public abstract void start(Class<? extends ApplicationProvider> appProviderClass, Map<String,Object> appConfig) throws Exception;
+
+ public static ApplicationSimulator getInstance(){
+ return Guice.createInjector(new ApplicationTestGuiceModule()).getInstance(ApplicationSimulator.class);
+ }
+
+ /**
+ * @param modules additional modules
+ * @return ApplicationSimulator instance
+ */
+ public static ApplicationSimulator getInstance(Module ... modules){
+ List<Module> contextModules = Arrays.asList(modules);
+ contextModules.add(new ApplicationTestGuiceModule());
+ return Guice.createInjector(contextModules).getInstance(ApplicationSimulator.class);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulatorImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulatorImpl.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulatorImpl.java
new file mode 100644
index 0000000..35dead2
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulatorImpl.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.test;
+
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+import org.apache.eagle.app.config.ApplicationProviderConfig;
+import org.apache.eagle.app.resource.ApplicationResource;
+import org.apache.eagle.app.service.ApplicationOperations;
+import org.apache.eagle.app.spi.ApplicationProvider;
+import org.apache.eagle.app.utils.DynamicJarPathFinder;
+import org.apache.eagle.metadata.model.ApplicationEntity;
+import org.apache.eagle.metadata.model.SiteEntity;
+import org.apache.eagle.metadata.resource.SiteResource;
+import org.junit.Assert;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class ApplicationSimulatorImpl extends ApplicationSimulator {
+ private final Config config;
+ private final SiteResource siteResource;
+ private final ApplicationResource applicationResource;
+
+ @Inject
+ public ApplicationSimulatorImpl(Config config, SiteResource siteResource, ApplicationResource applicationResource){
+ this.config = config;
+ this.siteResource = siteResource;
+ this.applicationResource = applicationResource;
+ }
+
+ @Override
+ public void start(String appType) {
+ start(appType, new HashMap<>());
+ }
+
+ @Override
+ public void start(String appType, Map<String, Object> appConfig) {
+ SiteEntity siteEntity = getUniqueSite();
+ siteResource.createSite(siteEntity);
+ Assert.assertNotNull(siteEntity.getUuid());
+ ApplicationOperations.InstallOperation installOperation = new ApplicationOperations.InstallOperation(siteEntity.getSiteId(),appType, ApplicationEntity.Mode.LOCAL);
+ installOperation.setConfiguration(appConfig);
+ // Install application
+ ApplicationEntity applicationEntity =
+ applicationResource.installApplication(installOperation).getData();
+ // Start application
+ applicationResource.startApplication(new ApplicationOperations.StartOperation(applicationEntity.getUuid()));
+ }
+
+ private final static AtomicInteger incr = new AtomicInteger();
+
+ private SiteEntity getUniqueSite(){
+ // Create local site
+ SiteEntity siteEntity = new SiteEntity();
+ siteEntity.setSiteId("SIMULATED_SITE_"+incr.incrementAndGet());
+ siteEntity.setSiteName(siteEntity.getSiteId());
+ siteEntity.setDescription("Automatically generated unique simulation site "+siteEntity.getSiteId()+" (simulator: "+this+")");
+ return siteEntity;
+ }
+
+ @Override
+ public void start(Class<? extends ApplicationProvider> appProviderClass) {
+ start(appProviderClass, new HashMap<>());
+ }
+
+ @Override
+ public void start(Class<? extends ApplicationProvider> appProviderClass, Map<String, Object> appConfig) {
+ try {
+ ApplicationProvider applicationProvider = appProviderClass.newInstance();
+ applicationProvider.prepare(new ApplicationProviderConfig(DynamicJarPathFinder.findPath(appProviderClass),appProviderClass),config);
+ start(applicationProvider.getApplicationDesc().getType(),appConfig);
+ } catch (InstantiationException | IllegalAccessException e) {
+ throw new IllegalStateException(e.getMessage(),e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationTestBase.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationTestBase.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationTestBase.java
new file mode 100644
index 0000000..1c7d6be
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationTestBase.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.test;
+
+import com.google.inject.Guice;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import org.apache.commons.dbcp.BasicDataSource;
+import org.apache.eagle.common.module.CommonGuiceModule;
+import org.apache.eagle.metadata.service.memory.MemoryMetadataStore;
+import org.junit.After;
+import org.junit.Before;
+
+import javax.sql.DataSource;
+import java.sql.SQLException;
+
+public class ApplicationTestBase {
+ private Injector injector;
+
+ @Before
+ public void setUp(){
+ injector = Guice.createInjector(new ApplicationTestGuiceModule());
+ injector.injectMembers(this);
+ }
+
+ public Injector injector(){
+ return injector;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationTestGuiceModule.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationTestGuiceModule.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationTestGuiceModule.java
new file mode 100644
index 0000000..4f7b2b4
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationTestGuiceModule.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.test;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Singleton;
+import org.apache.eagle.app.module.ApplicationExtensionLoader;
+import org.apache.eagle.app.module.ApplicationGuiceModule;
+import org.apache.eagle.common.module.CommonGuiceModule;
+import org.apache.eagle.common.module.GlobalScope;
+import org.apache.eagle.common.module.ModuleRegistry;
+import org.apache.eagle.metadata.service.memory.MemoryMetadataStore;
+
+public class ApplicationTestGuiceModule extends AbstractModule{
+ @Override
+ protected void configure() {
+ CommonGuiceModule common = new CommonGuiceModule();
+ ApplicationGuiceModule app = new ApplicationGuiceModule();
+ MemoryMetadataStore store = new MemoryMetadataStore();
+ install(common);
+ install(app);
+ install(store);
+ ModuleRegistry registry =ApplicationExtensionLoader.load(common,app,store);
+ registry.getModules(store.getClass()).forEach(this::install);
+ registry.getModules(GlobalScope.class).forEach(this::install);
+ bind(ApplicationSimulator.class).to(ApplicationSimulatorImpl.class).in(Singleton.class);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ServerSimulator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ServerSimulator.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ServerSimulator.java
deleted file mode 100644
index a91af77..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ServerSimulator.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.test;
-
-import com.google.inject.Guice;
-import com.google.inject.Module;
-import org.apache.eagle.app.spi.ApplicationProvider;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Application test simulator for developer to quickly run application without diving into application lifecycle
- */
-public abstract class ServerSimulator {
- /**
- *
- * @param appType
- */
- public abstract void start(String appType);
-
- /**
- *
- * @param appType
- * @param appConfig
- */
- public abstract void start(String appType, Map<String,Object> appConfig);
-
- /**
- *
- * @param appProviderClass
- */
- public abstract void start(Class<? extends ApplicationProvider> appProviderClass);
-
- /**
- *
- * @param appProviderClass
- * @param appConfig
- */
- public abstract void start(Class<? extends ApplicationProvider> appProviderClass, Map<String,Object> appConfig) throws Exception;
-
- public static ServerSimulator getInstance(){
- return Guice.createInjector(new AppTestGuiceModule()).getInstance(ServerSimulator.class);
- }
-
- /**
- * @param modules additional modules
- * @return ServerSimulator instance
- */
- public static ServerSimulator getInstance(Module ... modules){
- List<Module> contextModules = Arrays.asList(modules);
- contextModules.add(new AppTestGuiceModule());
- return Guice.createInjector(contextModules).getInstance(ServerSimulator.class);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ServerSimulatorImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ServerSimulatorImpl.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ServerSimulatorImpl.java
deleted file mode 100644
index 1ef91ff..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ServerSimulatorImpl.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.test;
-
-import com.google.inject.Inject;
-import com.typesafe.config.Config;
-import org.apache.eagle.app.config.ApplicationProviderConfig;
-import org.apache.eagle.app.resource.ApplicationResource;
-import org.apache.eagle.app.service.ApplicationOperations;
-import org.apache.eagle.app.spi.ApplicationProvider;
-import org.apache.eagle.app.utils.DynamicJarPathFinder;
-import org.apache.eagle.metadata.model.ApplicationEntity;
-import org.apache.eagle.metadata.model.SiteEntity;
-import org.apache.eagle.metadata.resource.SiteResource;
-import org.junit.Assert;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class ServerSimulatorImpl extends ServerSimulator {
- private final Config config;
- private final SiteResource siteResource;
- private final ApplicationResource applicationResource;
-
- @Inject
- public ServerSimulatorImpl(Config config, SiteResource siteResource, ApplicationResource applicationResource){
- this.config = config;
- this.siteResource = siteResource;
- this.applicationResource = applicationResource;
- }
-
- @Override
- public void start(String appType) {
- start(appType, new HashMap<>());
- }
-
- @Override
- public void start(String appType, Map<String, Object> appConfig) {
- SiteEntity siteEntity = getUniqueSite();
- siteResource.createSite(siteEntity);
- Assert.assertNotNull(siteEntity.getUuid());
- ApplicationOperations.InstallOperation installOperation = new ApplicationOperations.InstallOperation(siteEntity.getSiteId(),appType, ApplicationEntity.Mode.LOCAL);
- installOperation.setConfiguration(appConfig);
- // Install application
- ApplicationEntity applicationEntity =
- applicationResource.installApplication(installOperation).getData();
- // Start application
- applicationResource.startApplication(new ApplicationOperations.StartOperation(applicationEntity.getUuid()));
- }
-
- private final static AtomicInteger incr = new AtomicInteger();
-
- private SiteEntity getUniqueSite(){
- // Create local site
- SiteEntity siteEntity = new SiteEntity();
- siteEntity.setSiteId("SIMULATED_SITE_"+incr.incrementAndGet());
- siteEntity.setSiteName(siteEntity.getSiteId());
- siteEntity.setDescription("Automatically generated unique simulation site "+siteEntity.getSiteId()+" (simulator: "+this+")");
- return siteEntity;
- }
-
- @Override
- public void start(Class<? extends ApplicationProvider> appProviderClass) {
- start(appProviderClass, new HashMap<>());
- }
-
- @Override
- public void start(Class<? extends ApplicationProvider> appProviderClass, Map<String, Object> appConfig) {
- try {
- ApplicationProvider applicationProvider = appProviderClass.newInstance();
- applicationProvider.prepare(new ApplicationProviderConfig(DynamicJarPathFinder.findPath(appProviderClass),appProviderClass),config);
- start(applicationProvider.getApplicationDesc().getType(),appConfig);
- } catch (InstantiationException | IllegalAccessException e) {
- throw new IllegalStateException(e.getMessage(),e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-examples/eagle-app-example/src/test/java/org/apache/eagle/app/example/ExampleApplicationProviderTest.java
----------------------------------------------------------------------
diff --git a/eagle-examples/eagle-app-example/src/test/java/org/apache/eagle/app/example/ExampleApplicationProviderTest.java b/eagle-examples/eagle-app-example/src/test/java/org/apache/eagle/app/example/ExampleApplicationProviderTest.java
index e07f487..1c801bd 100644
--- a/eagle-examples/eagle-app-example/src/test/java/org/apache/eagle/app/example/ExampleApplicationProviderTest.java
+++ b/eagle-examples/eagle-app-example/src/test/java/org/apache/eagle/app/example/ExampleApplicationProviderTest.java
@@ -21,7 +21,7 @@ import org.apache.eagle.app.example.extensions.ExampleEntity;
import org.apache.eagle.app.example.extensions.ExampleResource;
import org.apache.eagle.app.resource.ApplicationResource;
import org.apache.eagle.app.service.ApplicationOperations;
-import org.apache.eagle.app.test.ServerSimulator;
+import org.apache.eagle.app.test.ApplicationSimulator;
import org.apache.eagle.app.test.AppJUnitRunner;
import org.apache.eagle.common.module.GlobalScope;
import org.apache.eagle.metadata.model.ApplicationDesc;
@@ -41,7 +41,7 @@ import java.util.Map;
public class ExampleApplicationProviderTest {
@Inject private SiteResource siteResource;
@Inject private ApplicationResource applicationResource;
- @Inject private ServerSimulator simulator;
+ @Inject private ApplicationSimulator simulator;
@Inject private ExampleResource exampleResource;
@Test
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-jpm/eagle-jpm-spark-running/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/pom.xml b/eagle-jpm/eagle-jpm-spark-running/pom.xml
index cc53e7c..34d8545 100644
--- a/eagle-jpm/eagle-jpm-spark-running/pom.xml
+++ b/eagle-jpm/eagle-jpm-spark-running/pom.xml
@@ -132,6 +132,11 @@
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoop.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.eagle</groupId>
+ <artifactId>eagle-app-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
<build>
<resources>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobApp.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobApp.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobApp.java
new file mode 100644
index 0000000..61c0751
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobApp.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jpm.spark.running;
+
+import backtype.storm.generated.StormTopology;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+import com.typesafe.config.Config;
+import org.apache.eagle.app.StormApplication;
+import org.apache.eagle.app.environment.impl.StormEnvironment;
+import org.apache.eagle.jpm.spark.running.storm.SparkRunningJobFetchSpout;
+import org.apache.eagle.jpm.spark.running.storm.SparkRunningJobParseBolt;
+
+public class SparkRunningJobApp extends StormApplication {
+ @Override
+ public StormTopology execute(Config config, StormEnvironment environment) {
+ //1. trigger init conf
+ SparkRunningJobAppConfig sparkRunningJobAppConfig = SparkRunningJobAppConfig.getInstance(config);
+
+ //2. init topology
+ TopologyBuilder topologyBuilder = new TopologyBuilder();
+ final String spoutName = SparkRunningJobAppConfig.JOB_FETCH_SPOUT_NAME;
+ final String boltName = SparkRunningJobAppConfig.JOB_PARSE_BOLT_NAME;
+ int parallelism = sparkRunningJobAppConfig.getTopologyConfig().jobFetchSpoutParallism;
+ int tasks = sparkRunningJobAppConfig.getTopologyConfig().jobFetchSpoutTasksNum;
+ if (parallelism > tasks) {
+ parallelism = tasks;
+ }
+ topologyBuilder.setSpout(
+ spoutName,
+ new SparkRunningJobFetchSpout(
+ sparkRunningJobAppConfig.getJobExtractorConfig(),
+ sparkRunningJobAppConfig.getEndpointConfig(),
+ sparkRunningJobAppConfig.getZkStateConfig()),
+ parallelism
+ ).setNumTasks(tasks);
+
+ parallelism = sparkRunningJobAppConfig.getTopologyConfig().jobParseBoltParallism;
+ tasks = sparkRunningJobAppConfig.getTopologyConfig().jobParseBoltTasksNum;
+ if (parallelism > tasks) {
+ parallelism = tasks;
+ }
+ topologyBuilder.setBolt(boltName,
+ new SparkRunningJobParseBolt(
+ sparkRunningJobAppConfig.getZkStateConfig(),
+ sparkRunningJobAppConfig.getEagleServiceConfig(),
+ sparkRunningJobAppConfig.getEndpointConfig(),
+ sparkRunningJobAppConfig.getJobExtractorConfig()),
+ parallelism).setNumTasks(tasks).fieldsGrouping(spoutName, new Fields("appId"));
+
+ return topologyBuilder.createTopology();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppConfig.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppConfig.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppConfig.java
new file mode 100644
index 0000000..668bc02
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppConfig.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.spark.running;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.dataproc.util.ConfigOptionParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+
+public class SparkRunningJobAppConfig implements Serializable {
+ private static final Logger LOG = LoggerFactory.getLogger(SparkRunningJobAppConfig.class);
+ static final String JOB_FETCH_SPOUT_NAME = "sparkRunningJobFetchSpout";
+ static final String JOB_PARSE_BOLT_NAME = "sparkRunningJobParseBolt";
+
+ public String getEnv() {
+ return env;
+ }
+ private String env;
+
+ ZKStateConfig getZkStateConfig() { return zkStateConfig; }
+ private ZKStateConfig zkStateConfig;
+ private TopologyConfig topologyConfig;
+ public TopologyConfig getTopologyConfig(){
+ return topologyConfig;
+ }
+
+ public EagleServiceConfig getEagleServiceConfig() {
+ return eagleServiceConfig;
+ }
+ private EagleServiceConfig eagleServiceConfig;
+
+ public JobExtractorConfig getJobExtractorConfig() {
+ return jobExtractorConfig;
+ }
+ private JobExtractorConfig jobExtractorConfig;
+
+ public EndpointConfig getEndpointConfig() {
+ return endpointConfig;
+ }
+ private EndpointConfig endpointConfig;
+
+ public static class TopologyConfig implements Serializable {
+ public int jobFetchSpoutParallism;
+ public int jobFetchSpoutTasksNum;
+ public int jobParseBoltParallism;
+ public int jobParseBoltTasksNum;
+ }
+
+ public static class ZKStateConfig implements Serializable {
+ public String zkQuorum;
+ public String zkRoot;
+ public int zkSessionTimeoutMs;
+ public int zkRetryTimes;
+ public int zkRetryInterval;
+ public String zkPort;
+ public boolean recoverEnabled;
+ }
+
+ public static class EagleServiceConfig implements Serializable {
+ public String eagleServiceHost;
+ public int eagleServicePort;
+ public int readTimeoutSeconds;
+ public int maxFlushNum;
+ public String username;
+ public String password;
+ }
+
+ public static class JobExtractorConfig implements Serializable {
+ public String site;
+ public int fetchRunningJobInterval;
+ public int parseThreadPoolSize;
+ }
+
+ public static class EndpointConfig implements Serializable {
+ public String nnEndpoint;
+ public String eventLog;
+ public String[] rmUrls;
+ public String principal;
+ public String keyTab;
+ }
+
+ public Config getConfig() {
+ return config;
+ }
+ private Config config;
+
+ private static SparkRunningJobAppConfig manager = new SparkRunningJobAppConfig();
+
+ private SparkRunningJobAppConfig() {
+ this.eagleServiceConfig = new EagleServiceConfig();
+ this.jobExtractorConfig = new JobExtractorConfig();
+ this.endpointConfig = new EndpointConfig();
+ this.zkStateConfig = new ZKStateConfig();
+ this.topologyConfig = new TopologyConfig();
+ }
+
+ public static SparkRunningJobAppConfig getInstance(String[] args) {
+ try {
+ LOG.info("Loading from configuration file");
+ manager.init(new ConfigOptionParser().load(args));
+ } catch (Exception e) {
+ LOG.error("failed to load config");
+ }
+ return manager;
+ }
+
+ public static SparkRunningJobAppConfig getInstance(Config config) {
+ manager.init(config);
+ return manager;
+ }
+
+ private void init(Config config){
+ this.config = config;
+ this.env = config.getString("envContextConfig.env");
+ this.zkStateConfig.zkQuorum = config.getString("zookeeperConfig.zkQuorum");
+ this.zkStateConfig.zkPort = config.getString("zookeeperConfig.zkPort");
+ this.zkStateConfig.zkSessionTimeoutMs = config.getInt("zookeeperConfig.zkSessionTimeoutMs");
+ this.zkStateConfig.zkRetryTimes = config.getInt("zookeeperConfig.zkRetryTimes");
+ this.zkStateConfig.zkRetryInterval = config.getInt("zookeeperConfig.zkRetryInterval");
+ this.zkStateConfig.zkRoot = config.getString("zookeeperConfig.zkRoot");
+ this.zkStateConfig.recoverEnabled = config.getBoolean("zookeeperConfig.recoverEnabled");
+
+
+ // parse eagle service endpoint
+ this.eagleServiceConfig.eagleServiceHost = config.getString("eagleProps.eagleService.host");
+ String port = config.getString("eagleProps.eagleService.port");
+ this.eagleServiceConfig.eagleServicePort = (port == null ? 8080 : Integer.parseInt(port));
+ this.eagleServiceConfig.username = config.getString("eagleProps.eagleService.username");
+ this.eagleServiceConfig.password = config.getString("eagleProps.eagleService.password");
+ this.eagleServiceConfig.readTimeoutSeconds = config.getInt("eagleProps.eagleService.readTimeOutSeconds");
+ this.eagleServiceConfig.maxFlushNum = config.getInt("eagleProps.eagleService.maxFlushNum");
+
+ //parse job extractor
+ this.jobExtractorConfig.site = config.getString("jobExtractorConfig.site");
+ this.jobExtractorConfig.fetchRunningJobInterval = config.getInt("jobExtractorConfig.fetchRunningJobInterval");
+ this.jobExtractorConfig.parseThreadPoolSize = config.getInt("jobExtractorConfig.parseThreadPoolSize");
+
+ //parse data source config
+ this.endpointConfig.eventLog = config.getString("dataSourceConfig.eventLog");
+ this.endpointConfig.nnEndpoint = config.getString("dataSourceConfig.nnEndpoint");
+ this.endpointConfig.keyTab = config.getString("dataSourceConfig.keytab");
+ this.endpointConfig.principal = config.getString("dataSourceConfig.principal");
+
+ this.endpointConfig.rmUrls = config.getString("dataSourceConfig.rmUrls").split(",");
+
+ this.topologyConfig.jobFetchSpoutParallism = config.getInt("envContextConfig.parallelismConfig." + JOB_FETCH_SPOUT_NAME);
+ this.topologyConfig.jobFetchSpoutTasksNum = config.getInt("envContextConfig.tasks." + JOB_FETCH_SPOUT_NAME);
+ this.topologyConfig.jobParseBoltParallism = config.getInt("envContextConfig.parallelismConfig." + JOB_PARSE_BOLT_NAME);
+ this.topologyConfig.jobParseBoltTasksNum = config.getInt("envContextConfig.tasks." + JOB_PARSE_BOLT_NAME);
+
+ LOG.info("Successfully initialized SparkRunningJobAppConfig");
+ LOG.info("env: " + this.env);
+ LOG.info("site: " + this.jobExtractorConfig.site);
+ LOG.info("eagle.service.host: " + this.eagleServiceConfig.eagleServiceHost);
+ LOG.info("eagle.service.port: " + this.eagleServiceConfig.eagleServicePort);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppProvider.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppProvider.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppProvider.java
new file mode 100644
index 0000000..3d20af7
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppProvider.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jpm.spark.running;
+
+import org.apache.eagle.app.spi.AbstractApplicationProvider;
+
+public class SparkRunningJobAppProvider extends AbstractApplicationProvider<SparkRunningJobApp> {
+ @Override
+ public SparkRunningJobApp getApplication() {
+ return new SparkRunningJobApp();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobMain.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobMain.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobMain.java
index 749f4d1..fe4a68c 100644
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobMain.java
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobMain.java
@@ -18,67 +18,8 @@
package org.apache.eagle.jpm.spark.running;
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.StormSubmitter;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.tuple.Fields;
-import org.apache.eagle.jpm.spark.running.common.SparkRunningConfigManager;
-import org.apache.eagle.jpm.spark.running.storm.SparkRunningJobFetchSpout;
-import org.apache.eagle.jpm.spark.running.storm.SparkRunningJobParseBolt;
-
public class SparkRunningJobMain {
public static void main(String[] args) {
- try {
- //1. trigger init conf
- SparkRunningConfigManager sparkRunningConfigManager = SparkRunningConfigManager.getInstance(args);
-
- //2. init topology
- TopologyBuilder topologyBuilder = new TopologyBuilder();
- String topologyName = sparkRunningConfigManager.getConfig().getString("envContextConfig.topologyName");
- String spoutName = "sparkRunningJobFetchSpout";
- String boltName = "sparkRunningJobParseBolt";
- int parallelism = sparkRunningConfigManager.getConfig().getInt("envContextConfig.parallelismConfig." + spoutName);
- int tasks = sparkRunningConfigManager.getConfig().getInt("envContextConfig.tasks." + spoutName);
- if (parallelism > tasks) {
- parallelism = tasks;
- }
- topologyBuilder.setSpout(
- spoutName,
- new SparkRunningJobFetchSpout(
- sparkRunningConfigManager.getJobExtractorConfig(),
- sparkRunningConfigManager.getEndpointConfig(),
- sparkRunningConfigManager.getZkStateConfig()),
- parallelism
- ).setNumTasks(tasks);
-
- parallelism = sparkRunningConfigManager.getConfig().getInt("envContextConfig.parallelismConfig." + boltName);
- tasks = sparkRunningConfigManager.getConfig().getInt("envContextConfig.tasks." + boltName);
- if (parallelism > tasks) {
- parallelism = tasks;
- }
- topologyBuilder.setBolt(boltName,
- new SparkRunningJobParseBolt(
- sparkRunningConfigManager.getZkStateConfig(),
- sparkRunningConfigManager.getEagleServiceConfig(),
- sparkRunningConfigManager.getEndpointConfig(),
- sparkRunningConfigManager.getJobExtractorConfig()),
- parallelism).setNumTasks(tasks).fieldsGrouping(spoutName, new Fields("appId"));
-
- backtype.storm.Config config = new backtype.storm.Config();
- config.setNumWorkers(sparkRunningConfigManager.getConfig().getInt("envContextConfig.workers"));
- config.put(Config.TOPOLOGY_DEBUG, true);
- if (!sparkRunningConfigManager.getEnv().equals("local")) {
- //cluster mode
- //parse conf here
- StormSubmitter.submitTopology(topologyName, config, topologyBuilder.createTopology());
- } else {
- //local mode
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology(topologyName, config, topologyBuilder.createTopology());
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
+ new SparkRunningJobApp().run(args);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/common/SparkRunningConfigManager.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/common/SparkRunningConfigManager.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/common/SparkRunningConfigManager.java
deleted file mode 100644
index b05d12e..0000000
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/common/SparkRunningConfigManager.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.eagle.jpm.spark.running.common;
-
-import com.typesafe.config.Config;
-import org.apache.eagle.dataproc.util.ConfigOptionParser;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Serializable;
-
-public class SparkRunningConfigManager implements Serializable {
- private static final Logger LOG = LoggerFactory.getLogger(SparkRunningConfigManager.class);
-
- public String getEnv() {
- return env;
- }
- private String env;
-
- public ZKStateConfig getZkStateConfig() { return zkStateConfig; }
- private ZKStateConfig zkStateConfig;
-
- public EagleServiceConfig getEagleServiceConfig() {
- return eagleServiceConfig;
- }
- private EagleServiceConfig eagleServiceConfig;
-
- public JobExtractorConfig getJobExtractorConfig() {
- return jobExtractorConfig;
- }
- private JobExtractorConfig jobExtractorConfig;
-
- public EndpointConfig getEndpointConfig() {
- return endpointConfig;
- }
- private EndpointConfig endpointConfig;
-
- public static class ZKStateConfig implements Serializable {
- public String zkQuorum;
- public String zkRoot;
- public int zkSessionTimeoutMs;
- public int zkRetryTimes;
- public int zkRetryInterval;
- public String zkPort;
- public boolean recoverEnabled;
- }
-
- public static class EagleServiceConfig implements Serializable {
- public String eagleServiceHost;
- public int eagleServicePort;
- public int readTimeoutSeconds;
- public int maxFlushNum;
- public String username;
- public String password;
- }
-
- public static class JobExtractorConfig implements Serializable {
- public String site;
- public int fetchRunningJobInterval;
- public int parseThreadPoolSize;
- }
-
- public static class EndpointConfig implements Serializable {
- public String nnEndpoint;
- public String eventLog;
- public String[] rmUrls;
- public String principal;
- public String keyTab;
- }
-
- public Config getConfig() {
- return config;
- }
- private Config config;
-
- private static SparkRunningConfigManager manager = new SparkRunningConfigManager();
-
- private SparkRunningConfigManager() {
- this.eagleServiceConfig = new EagleServiceConfig();
- this.jobExtractorConfig = new JobExtractorConfig();
- this.endpointConfig = new EndpointConfig();
- this.zkStateConfig = new ZKStateConfig();
- }
-
- public static SparkRunningConfigManager getInstance(String[] args) {
- manager.init(args);
- return manager;
- }
-
- private void init(String[] args) {
- try {
- LOG.info("Loading from configuration file");
- this.config = new ConfigOptionParser().load(args);
- } catch (Exception e) {
- LOG.error("failed to load config");
- }
-
- this.env = config.getString("envContextConfig.env");
-
- this.zkStateConfig.zkQuorum = config.getString("zookeeperConfig.zkQuorum");
- this.zkStateConfig.zkPort = config.getString("zookeeperConfig.zkPort");
- this.zkStateConfig.zkSessionTimeoutMs = config.getInt("zookeeperConfig.zkSessionTimeoutMs");
- this.zkStateConfig.zkRetryTimes = config.getInt("zookeeperConfig.zkRetryTimes");
- this.zkStateConfig.zkRetryInterval = config.getInt("zookeeperConfig.zkRetryInterval");
- this.zkStateConfig.zkRoot = config.getString("zookeeperConfig.zkRoot");
- this.zkStateConfig.recoverEnabled = config.getBoolean("zookeeperConfig.recoverEnabled");
-
- // parse eagle service endpoint
- this.eagleServiceConfig.eagleServiceHost = config.getString("eagleProps.eagleService.host");
- String port = config.getString("eagleProps.eagleService.port");
- this.eagleServiceConfig.eagleServicePort = (port == null ? 8080 : Integer.parseInt(port));
- this.eagleServiceConfig.username = config.getString("eagleProps.eagleService.username");
- this.eagleServiceConfig.password = config.getString("eagleProps.eagleService.password");
- this.eagleServiceConfig.readTimeoutSeconds = config.getInt("eagleProps.eagleService.readTimeOutSeconds");
- this.eagleServiceConfig.maxFlushNum = config.getInt("eagleProps.eagleService.maxFlushNum");
-
- //parse job extractor
- this.jobExtractorConfig.site = config.getString("jobExtractorConfig.site");
- this.jobExtractorConfig.fetchRunningJobInterval = config.getInt("jobExtractorConfig.fetchRunningJobInterval");
- this.jobExtractorConfig.parseThreadPoolSize = config.getInt("jobExtractorConfig.parseThreadPoolSize");
-
- //parse data source config
- this.endpointConfig.eventLog = config.getString("dataSourceConfig.eventLog");
- this.endpointConfig.nnEndpoint = config.getString("dataSourceConfig.nnEndpoint");
- this.endpointConfig.keyTab = config.getString("dataSourceConfig.keytab");
- this.endpointConfig.principal = config.getString("dataSourceConfig.principal");
- this.endpointConfig.rmUrls = config.getStringList("dataSourceConfig.rmUrls").toArray(new String[0]);
-
- LOG.info("Successfully initialized SparkRunningConfigManager");
- LOG.info("env: " + this.env);
- LOG.info("site: " + this.jobExtractorConfig.site);
- LOG.info("eagle.service.host: " + this.eagleServiceConfig.eagleServiceHost);
- LOG.info("eagle.service.port: " + this.eagleServiceConfig.eagleServicePort);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkAppEntityCreationHandler.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkAppEntityCreationHandler.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkAppEntityCreationHandler.java
index 5491a80..92adfa8 100644
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkAppEntityCreationHandler.java
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkAppEntityCreationHandler.java
@@ -18,7 +18,7 @@
package org.apache.eagle.jpm.spark.running.parser;
-import org.apache.eagle.jpm.spark.running.common.SparkRunningConfigManager;
+import org.apache.eagle.jpm.spark.running.SparkRunningJobAppConfig;
import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
import org.apache.eagle.service.client.IEagleServiceClient;
import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
@@ -32,9 +32,9 @@ public class SparkAppEntityCreationHandler {
private static final Logger LOG = LoggerFactory.getLogger(SparkAppEntityCreationHandler.class);
private List<TaggedLogAPIEntity> entities = new ArrayList<>();
- private SparkRunningConfigManager.EagleServiceConfig eagleServiceConfig;
+ private SparkRunningJobAppConfig.EagleServiceConfig eagleServiceConfig;
- public SparkAppEntityCreationHandler(SparkRunningConfigManager.EagleServiceConfig eagleServiceConfig) {
+ public SparkAppEntityCreationHandler(SparkRunningJobAppConfig.EagleServiceConfig eagleServiceConfig) {
this.eagleServiceConfig = eagleServiceConfig;
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkApplicationParser.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkApplicationParser.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkApplicationParser.java
index bb76213..b2a5b63 100644
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkApplicationParser.java
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkApplicationParser.java
@@ -20,7 +20,7 @@ package org.apache.eagle.jpm.spark.running.parser;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.eagle.jpm.spark.crawl.EventType;
-import org.apache.eagle.jpm.spark.running.common.SparkRunningConfigManager;
+import org.apache.eagle.jpm.spark.running.SparkRunningJobAppConfig;
import org.apache.eagle.jpm.spark.running.entities.*;
import org.apache.eagle.jpm.spark.running.recover.SparkRunningJobManager;
import org.apache.eagle.jpm.util.Constants;
@@ -64,7 +64,7 @@ public class SparkApplicationParser implements Runnable {
private Map<Integer, Pair<Integer, Pair<Long, Long>>> stagesTime;
private Set<Integer> completeStages;
private Configuration hdfsConf;
- private SparkRunningConfigManager.EndpointConfig endpointConfig;
+ private SparkRunningJobAppConfig.EndpointConfig endpointConfig;
private final Object lock = new Object();
private static final ObjectMapper OBJ_MAPPER = new ObjectMapper();
private Map<String, String> commonTags = new HashMap<>();
@@ -78,9 +78,9 @@ public class SparkApplicationParser implements Runnable {
OBJ_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true);
}
- public SparkApplicationParser(SparkRunningConfigManager.EagleServiceConfig eagleServiceConfig,
- SparkRunningConfigManager.EndpointConfig endpointConfig,
- SparkRunningConfigManager.JobExtractorConfig jobExtractorConfig,
+ public SparkApplicationParser(SparkRunningJobAppConfig.EagleServiceConfig eagleServiceConfig,
+ SparkRunningJobAppConfig.EndpointConfig endpointConfig,
+ SparkRunningJobAppConfig.JobExtractorConfig jobExtractorConfig,
AppInfo app, Map<String, SparkAppEntity> sparkApp,
SparkRunningJobManager sparkRunningJobManager, ResourceFetcher rmResourceFetcher) {
this.sparkAppEntityCreationHandler = new SparkAppEntityCreationHandler(eagleServiceConfig);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/recover/SparkRunningJobManager.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/recover/SparkRunningJobManager.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/recover/SparkRunningJobManager.java
index 2b6c62f..11f7909 100644
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/recover/SparkRunningJobManager.java
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/recover/SparkRunningJobManager.java
@@ -19,7 +19,7 @@
package org.apache.eagle.jpm.spark.running.recover;
import org.apache.commons.lang3.tuple.Pair;
-import org.apache.eagle.jpm.spark.running.common.SparkRunningConfigManager;
+import org.apache.eagle.jpm.spark.running.SparkRunningJobAppConfig;
import org.apache.eagle.jpm.spark.running.entities.SparkAppEntity;
import org.apache.eagle.jpm.util.jobrecover.RunningJobManager;
import org.apache.eagle.jpm.util.resourceFetch.model.AppInfo;
@@ -30,7 +30,7 @@ import java.util.*;
public class SparkRunningJobManager implements Serializable {
private RunningJobManager runningJobManager;
- public SparkRunningJobManager(SparkRunningConfigManager.ZKStateConfig config) {
+ public SparkRunningJobManager(SparkRunningJobAppConfig.ZKStateConfig config) {
this.runningJobManager = new RunningJobManager(config.zkQuorum,
config.zkSessionTimeoutMs, config.zkRetryTimes, config.zkRetryInterval, config.zkRoot);
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobFetchSpout.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobFetchSpout.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobFetchSpout.java
index 6be0cfd..256829e 100644
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobFetchSpout.java
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobFetchSpout.java
@@ -24,7 +24,7 @@ import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
-import org.apache.eagle.jpm.spark.running.common.SparkRunningConfigManager;
+import org.apache.eagle.jpm.spark.running.SparkRunningJobAppConfig;
import org.apache.eagle.jpm.spark.running.entities.SparkAppEntity;
import org.apache.eagle.jpm.spark.running.recover.SparkRunningJobManager;
import org.apache.eagle.jpm.util.Constants;
@@ -40,18 +40,18 @@ import java.util.*;
public class SparkRunningJobFetchSpout extends BaseRichSpout {
private static final Logger LOG = LoggerFactory.getLogger(SparkRunningJobFetchSpout.class);
- private SparkRunningConfigManager.ZKStateConfig zkStateConfig;
- private SparkRunningConfigManager.JobExtractorConfig jobExtractorConfig;
- private SparkRunningConfigManager.EndpointConfig endpointConfig;
+ private SparkRunningJobAppConfig.ZKStateConfig zkStateConfig;
+ private SparkRunningJobAppConfig.JobExtractorConfig jobExtractorConfig;
+ private SparkRunningJobAppConfig.EndpointConfig endpointConfig;
private ResourceFetcher resourceFetcher;
private SpoutOutputCollector collector;
private boolean init;
private transient SparkRunningJobManager sparkRunningJobManager;
private Set<String> runningYarnApps;
- public SparkRunningJobFetchSpout(SparkRunningConfigManager.JobExtractorConfig jobExtractorConfig,
- SparkRunningConfigManager.EndpointConfig endpointConfig,
- SparkRunningConfigManager.ZKStateConfig zkStateConfig) {
+ public SparkRunningJobFetchSpout(SparkRunningJobAppConfig.JobExtractorConfig jobExtractorConfig,
+ SparkRunningJobAppConfig.EndpointConfig endpointConfig,
+ SparkRunningJobAppConfig.ZKStateConfig zkStateConfig) {
this.jobExtractorConfig = jobExtractorConfig;
this.endpointConfig = endpointConfig;
this.zkStateConfig = zkStateConfig;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobParseBolt.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobParseBolt.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobParseBolt.java
index 6928240..d207ffc 100644
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobParseBolt.java
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobParseBolt.java
@@ -23,7 +23,7 @@ import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
-import org.apache.eagle.jpm.spark.running.common.SparkRunningConfigManager;
+import org.apache.eagle.jpm.spark.running.SparkRunningJobAppConfig;
import org.apache.eagle.jpm.spark.running.entities.SparkAppEntity;
import org.apache.eagle.jpm.spark.running.parser.SparkApplicationParser;
import org.apache.eagle.jpm.spark.running.recover.SparkRunningJobManager;
@@ -44,17 +44,17 @@ import java.util.concurrent.ExecutorService;
public class SparkRunningJobParseBolt extends BaseRichBolt {
private static final Logger LOG = LoggerFactory.getLogger(SparkRunningJobParseBolt.class);
- private SparkRunningConfigManager.ZKStateConfig zkStateConfig;
- private SparkRunningConfigManager.EagleServiceConfig eagleServiceConfig;
- private SparkRunningConfigManager.EndpointConfig endpointConfig;
- private SparkRunningConfigManager.JobExtractorConfig jobExtractorConfig;
+ private SparkRunningJobAppConfig.ZKStateConfig zkStateConfig;
+ private SparkRunningJobAppConfig.EagleServiceConfig eagleServiceConfig;
+ private SparkRunningJobAppConfig.EndpointConfig endpointConfig;
+ private SparkRunningJobAppConfig.JobExtractorConfig jobExtractorConfig;
private ExecutorService executorService;
private Map<String, SparkApplicationParser> runningSparkParsers;
private ResourceFetcher resourceFetcher;
- public SparkRunningJobParseBolt(SparkRunningConfigManager.ZKStateConfig zkStateConfig,
- SparkRunningConfigManager.EagleServiceConfig eagleServiceConfig,
- SparkRunningConfigManager.EndpointConfig endpointConfig,
- SparkRunningConfigManager.JobExtractorConfig jobExtractorConfig) {
+ public SparkRunningJobParseBolt(SparkRunningJobAppConfig.ZKStateConfig zkStateConfig,
+ SparkRunningJobAppConfig.EagleServiceConfig eagleServiceConfig,
+ SparkRunningJobAppConfig.EndpointConfig endpointConfig,
+ SparkRunningJobAppConfig.JobExtractorConfig jobExtractorConfig) {
this.zkStateConfig = zkStateConfig;
this.eagleServiceConfig = eagleServiceConfig;
this.endpointConfig = endpointConfig;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-jpm/eagle-jpm-spark-running/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.running.SparkRunningJobAppProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.running.SparkRunningJobAppProvider.xml b/eagle-jpm/eagle-jpm-spark-running/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.running.SparkRunningJobAppProvider.xml
new file mode 100644
index 0000000..24cf09e
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.running.SparkRunningJobAppProvider.xml
@@ -0,0 +1,195 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<application>
+ <type>SPARK_RUNNING_JOB_APP</type>
+ <name>Spark Running Job Monitoring</name>
+ <version>0.5.0-incubating</version>
+ <appClass>org.apache.eagle.jpm.spark.running.SparkRunningJobApp</appClass>
+ <viewPath>/apps/jpm</viewPath>
+ <configuration>
+ <!-- org.apache.eagle.jpm.spark.running.SparkRunningJobAppConfig -->
+ <property>
+ <name>envContextConfig.env</name>
+ <value>local</value>
+ <displayName>Environment</displayName>
+ <description>Execution environment</description>
+ </property>
+ <property>
+ <name>zookeeperConfig.zkQuorum</name>
+ <displayName>zkQuorum</displayName>
+ <description>Zookeeper Quorum</description>
+ <value>sandbox.hortonworks.com:2181</value>
+ </property>
+ <property>
+ <name>zookeeperConfig.zkPort</name>
+ <displayName>zkPort</displayName>
+ <description>Zookeeper Port</description>
+ <value>2181</value>
+ </property>
+ <property>
+ <name>zookeeperConfig.zkSessionTimeoutMs</name>
+ <displayName>zkSessionTimeoutMs</displayName>
+ <description>Zookeeper session timeoutMs</description>
+ <value>15000</value>
+ </property>
+ <property>
+ <name>zookeeperConfig.zkRetryTimes</name>
+ <displayName>zkRetryTimes</displayName>
+ <description>zookeeperConfig.zkRetryTimes</description>
+ <value>3</value>
+ </property>
+ <property>
+ <name>zookeeperConfig.zkRetryInterval</name>
+ <displayName>zkRetryInterval</displayName>
+ <description>zookeeperConfig.zkRetryInterval</description>
+ <value>20000</value>
+ </property>
+ <property>
+ <name>zookeeperConfig.zkRoot</name>
+ <value>/apps/spark/running</value>
+ </property>
+ <property>
+ <name>zookeeperConfig.recoverEnabled</name>
+ <description>zookeeperConfig.recoverEnabled</description>
+ <value>false</value>
+ </property>
+ <property>
+ <name>eagleProps.eagleService.host</name>
+ <description>eagleProps.eagleService.host</description>
+ <value>sandbox.hortonworks.com</value>
+ </property>
+ <property>
+ <name>eagleProps.eagleService.port</name>
+ <description>eagleProps.eagleService.port</description>
+ <value>9099</value>
+ </property>
+ <property>
+ <name>eagleProps.eagleService.username</name>
+ <description>eagleProps.eagleService.username</description>
+ <value>admin</value>
+ </property>
+ <property>
+ <name>eagleProps.eagleService.password</name>
+ <description>eagleProps.eagleService.password</description>
+ <value>secret</value>
+ </property>
+ <property>
+ <name>eagleProps.eagleService.readTimeOutSeconds</name>
+ <description>eagleProps.eagleService.readTimeOutSeconds</description>
+ <value>20</value>
+ </property>
+ <property>
+ <name>eagleProps.eagleService.maxFlushNum</name>
+ <description>eagleProps.eagleService.maxFlushNum</description>
+ <value>500</value>
+ </property>
+ <property>
+ <name>jobExtractorConfig.site</name>
+ <description>jobExtractorConfig.site</description>
+ <value>sandbox</value>
+ </property>
+ <property>
+ <name>jobExtractorConfig.fetchRunningJobInterval</name>
+ <description>jobExtractorConfig.fetchRunningJobInterval</description>
+ <value>15</value>
+ </property>
+ <property>
+ <name>jobExtractorConfig.parseThreadPoolSize</name>
+ <description>jobExtractorConfig.parseThreadPoolSize</description>
+ <value>5</value>
+ </property>
+ <property>
+ <name>dataSourceConfig.eventLog</name>
+ <description>dataSourceConfig.eventLog</description>
+ <value>/spark-history</value>
+ </property>
+ <property>
+ <name>dataSourceConfig.nnEndpoint</name>
+ <description>dataSourceConfig.nnEndpoint</description>
+ <value>hdfs://sandbox.hortonworks.com:8020</value>
+ </property>
+ <property>
+ <name>dataSourceConfig.keytab</name>
+ <description>dataSourceConfig.keytab</description>
+ <value></value>
+ </property>
+ <property>
+ <name>dataSourceConfig.principal</name>
+ <description>dataSourceConfig.principal</description>
+ <value></value>
+ </property>
+ <property>
+ <name>dataSourceConfig.rmUrls</name>
+ <description>dataSourceConfig.rmUrls</description>
+ <value>http://sandbox.hortonworks.com:8088</value>
+ </property>
+ <property>
+ <name>envContextConfig.parallelismConfig.sparkRunningJobFetchSpout</name>
+ <description>Parallelism of sparkRunningJobFetchSpout </description>
+ <value>1</value>
+ </property>
+ <property>
+ <name>envContextConfig.tasks.sparkRunningJobFetchSpout</name>
+ <description>Tasks Num of sparkRunningJobFetchSpout </description>
+ <value>4</value>
+ </property>
+ <property>
+ <name>envContextConfig.parallelismConfig.sparkRunningJobParseBolt</name>
+ <description>Parallelism of sparkRunningJobParseBolt </description>
+ <value>1</value>
+ </property>
+ <property>
+ <name>envContextConfig.tasks.sparkRunningJobParseBolt</name>
+ <description>Tasks Num of sparkRunningJobParseBolt</description>
+ <value>4</value>
+ </property>
+ </configuration>
+ <docs>
+ <install>
+ # Step 1: Create source kafka topic named "${site}_example_source_topic"
+
+ ./bin/kafka-topics.sh --create --topic example_source_topic --replication-factor 1 --replication 1
+
+ # Step 2: Set up data collector to flow data into kafka topic in
+
+ ./bin/logstash -f log_collector.conf
+
+ ## `log_collector.conf` sample as following:
+
+ input {
+
+ }
+ filter {
+
+ }
+ output{
+
+ }
+
+ # Step 3: start application
+
+ # Step 4: monitor with featured portal or alert with policies
+ </install>
+ <uninstall>
+ # Step 1: stop and uninstall application
+ # Step 2: delete kafka topic named "${site}_example_source_topic"
+ # Step 3: stop logstash
+ </uninstall>
+ </docs>
+</application>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-jpm/eagle-jpm-spark-running/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider b/eagle-jpm/eagle-jpm-spark-running/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
new file mode 100644
index 0000000..6aef879
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.eagle.jpm.spark.running.SparkRunningJobAppProvider
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-jpm/eagle-jpm-spark-running/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/resources/application.conf b/eagle-jpm/eagle-jpm-spark-running/src/main/resources/application.conf
index d93a135..9d9f622 100644
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/resources/application.conf
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/resources/application.conf
@@ -15,8 +15,6 @@
{
"envContextConfig" : {
- "env" : "local",
- "topologyName" : "sparkRunningJob",
"stormConfigFile" : "storm.yaml",
"parallelismConfig" : {
"sparkRunningJobFetchSpout" : 1,
@@ -28,15 +26,13 @@
},
"workers" : 2
},
-
"jobExtractorConfig" : {
"site" : "sandbox",
"fetchRunningJobInterval" : 15,
"parseThreadPoolSize" : 5
},
-
"dataSourceConfig" : {
- "rmUrls": ["http://sandbox.hortonworks.com:8088"],
+ "rmUrls": "http://sandbox.hortonworks.com:8088",
"nnEndpoint" : "hdfs://sandbox.hortonworks.com:8020",
"principal" : "", #if not need, then empty
"keytab" : "",
@@ -52,7 +48,8 @@
"zkRetryTimes" : 3,
"zkRetryInterval" : 20000
},
-
+ "appId":"sparkRunningJob",
+ "mode":"LOCAL",
"eagleProps" : {
"mailHost" : "abc.com",
"mailDebug" : "true",
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-jpm/eagle-jpm-spark-running/src/test/java/SparkRunningJobAppProviderTest.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/test/java/SparkRunningJobAppProviderTest.java b/eagle-jpm/eagle-jpm-spark-running/src/test/java/SparkRunningJobAppProviderTest.java
new file mode 100644
index 0000000..346171a
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-running/src/test/java/SparkRunningJobAppProviderTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import com.google.inject.Inject;
+import org.apache.eagle.app.test.ApplicationSimulator;
+import org.apache.eagle.app.test.ApplicationTestBase;
+import org.apache.eagle.jpm.spark.running.SparkRunningJobAppProvider;
+import org.junit.Test;
+
+public class SparkRunningJobAppProviderTest extends ApplicationTestBase {
+ @Inject
+ ApplicationSimulator simulator;
+
+ @Test
+ public void testRunWithProvider(){
+ simulator.start(SparkRunningJobAppProvider.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 04589f9..72d0ad7 100755
--- a/pom.xml
+++ b/pom.xml
@@ -1190,6 +1190,7 @@
<exclude>README*</exclude>
<exclude>**/*.log</exclude>
<exclude>**/*.out</exclude>
+ <exclude>**/*.db</exclude>
<exclude>**/eagle.log*</exclude>
<exclude>**/velocity.log*</exclude>
<!-- all json files should be excluded -->