You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by yo...@apache.org on 2016/08/08 00:46:16 UTC
[2/3] incubator-eagle git commit: HBase audit monitoring with new app
framework https://issues.apache.org/jira/browse/EAGLE-420 Author: Yong Zhang
Reviewer: Hao Chen
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StreamSinkProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StreamSinkProvider.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StreamSinkProvider.java
index 7c64e50..60f49ef 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StreamSinkProvider.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StreamSinkProvider.java
@@ -1,46 +1,47 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.sink;
-
-import org.apache.eagle.app.Configuration;
-import org.apache.eagle.metadata.model.StreamSinkConfig;
-
-import java.lang.reflect.ParameterizedType;
-
-public interface StreamSinkProvider<S extends StreamSink<D>,D extends StreamSinkConfig>{
- /**
- * @param streamId
- * @param appConfig
- * @return
- */
- D getSinkConfig(String streamId, Configuration appConfig);
- S getSink();
-
- default S getSink(String streamId, Configuration appConfig){
- S s = getSink();
- s.init(streamId,getSinkConfig(streamId,appConfig));
- return s;
- }
-
- default Class<? extends S> getSinkType(){
- return (Class<S>) ((ParameterizedType) getClass().getGenericSuperclass()).getActualTypeArguments()[0];
- }
-
- default Class<? extends D> getSinkConfigType(){
- return (Class<D>) ((ParameterizedType) getClass().getGenericSuperclass()).getActualTypeArguments()[1];
- }
-}
\ No newline at end of file
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.sink;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.app.Configuration;
+import org.apache.eagle.metadata.model.StreamSinkConfig;
+
+import java.lang.reflect.ParameterizedType;
+
+public interface StreamSinkProvider<S extends StreamSink<D>,D extends StreamSinkConfig>{
+ /**
+ * @param streamId
+ * @param config
+ * @return
+ */
+ D getSinkConfig(String streamId, Config config);
+ S getSink();
+
+ default S getSink(String streamId, Config config){
+ S s = getSink();
+ s.init(streamId,getSinkConfig(streamId,config));
+ return s;
+ }
+
+ default Class<? extends S> getSinkType(){
+ return (Class<S>) ((ParameterizedType) getClass().getGenericSuperclass()).getActualTypeArguments()[0];
+ }
+
+ default Class<? extends D> getSinkConfigType(){
+ return (Class<D>) ((ParameterizedType) getClass().getGenericSuperclass()).getActualTypeArguments()[1];
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java
index 20816db..bf1e587 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java
@@ -128,4 +128,4 @@ public abstract class AbstractApplicationProvider<T extends Application> impleme
public ApplicationDesc getApplicationDesc() {
return applicationDesc;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java
index ace0c45..be84f0c 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java
@@ -34,4 +34,4 @@ public interface ApplicationProvider<T extends Application> {
* @return application instance
*/
T getApplication();
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ServerSimulatorImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ServerSimulatorImpl.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ServerSimulatorImpl.java
index 2c686d9..1ef91ff 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ServerSimulatorImpl.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ServerSimulatorImpl.java
@@ -54,8 +54,11 @@ public class ServerSimulatorImpl extends ServerSimulator {
SiteEntity siteEntity = getUniqueSite();
siteResource.createSite(siteEntity);
Assert.assertNotNull(siteEntity.getUuid());
+ ApplicationOperations.InstallOperation installOperation = new ApplicationOperations.InstallOperation(siteEntity.getSiteId(),appType, ApplicationEntity.Mode.LOCAL);
+ installOperation.setConfiguration(appConfig);
// Install application
- ApplicationEntity applicationEntity = applicationResource.installApplication(new ApplicationOperations.InstallOperation(siteEntity.getSiteId(),appType, ApplicationEntity.Mode.LOCAL)).getData();
+ ApplicationEntity applicationEntity =
+ applicationResource.installApplication(installOperation).getData();
// Start application
applicationResource.startApplication(new ApplicationOperations.StartOperation(applicationEntity.getUuid()));
}
@@ -86,4 +89,4 @@ public class ServerSimulatorImpl extends ServerSimulator {
throw new IllegalStateException(e.getMessage(),e);
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestStormApplication.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestStormApplication.java b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestStormApplication.java
index 0558454..f58f0aa 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestStormApplication.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestStormApplication.java
@@ -1,84 +1,90 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app;
-
-import backtype.storm.generated.StormTopology;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.tuple.Fields;
-import org.apache.eagle.app.environment.impl.StormEnvironment;
-import org.apache.eagle.app.spi.AbstractApplicationProvider;
-import org.junit.Ignore;
-
-import java.util.Arrays;
-import java.util.Map;
-
-@Ignore
-public class TestStormApplication extends StormApplication<TestStormApplication.TestStormAppConfig>{
- @Override
- public StormTopology execute(TestStormAppConfig config, StormEnvironment environment) {
- TopologyBuilder builder = new TopologyBuilder();
- builder.setSpout("metric_spout", new RandomEventSpout(), config.getSpoutNum());
- builder.setBolt("sink_1",environment.getFlattenStreamSink("TEST_STREAM_1",config)).fieldsGrouping("metric_spout",new Fields("metric"));
- builder.setBolt("sink_2",environment.getFlattenStreamSink("TEST_STREAM_2",config)).fieldsGrouping("metric_spout",new Fields("metric"));
- return builder.createTopology();
- }
-
- public final static class TestStormAppConfig extends Configuration{
- private int spoutNum = 1;
-
- public int getSpoutNum() {
- return spoutNum;
- }
-
- public void setSpoutNum(int spoutNum) {
- this.spoutNum = spoutNum;
- }
- }
-
- private class RandomEventSpout extends BaseRichSpout {
- private SpoutOutputCollector _collector;
- @Override
- public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
- _collector = spoutOutputCollector;
- }
-
- @Override
- public void nextTuple() {
- _collector.emit(Arrays.asList("disk.usage",System.currentTimeMillis(),"host_1",56.7));
- _collector.emit(Arrays.asList("cpu.usage",System.currentTimeMillis(),"host_2",99.8));
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
- outputFieldsDeclarer.declare(new Fields("metric","timestamp","source","value"));
- }
- }
-
- public final static class Provider extends AbstractApplicationProvider<TestStormApplication> {
- public Provider(){
- super("TestApplicationMetadata.xml");
- }
- @Override
- public TestStormApplication getApplication() {
- return new TestStormApplication();
- }
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app;
+
+import backtype.storm.generated.StormTopology;
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import com.typesafe.config.Config;
+import org.apache.eagle.app.environment.impl.StormEnvironment;
+import org.apache.eagle.app.spi.AbstractApplicationProvider;
+import org.junit.Ignore;
+
+import java.util.Arrays;
+import java.util.Map;
+
+@Ignore
+public class TestStormApplication extends StormApplication<TestStormApplication.TestStormAppConfig>{
+ @Override
+ public StormTopology execute(TestStormAppConfig config, StormEnvironment environment){
+ return null;
+ }
+
+ @Override
+ public StormTopology execute(Config config, StormEnvironment environment) {
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.setSpout("metric_spout", new RandomEventSpout(), config.getInt("spoutNum"));
+ builder.setBolt("sink_1",environment.getFlattenStreamSink("TEST_STREAM_1",config)).fieldsGrouping("metric_spout",new Fields("metric"));
+ builder.setBolt("sink_2",environment.getFlattenStreamSink("TEST_STREAM_2",config)).fieldsGrouping("metric_spout",new Fields("metric"));
+ return builder.createTopology();
+ }
+
+ public final static class TestStormAppConfig extends Configuration{
+ private int spoutNum = 1;
+
+ public int getSpoutNum() {
+ return spoutNum;
+ }
+
+ public void setSpoutNum(int spoutNum) {
+ this.spoutNum = spoutNum;
+ }
+ }
+
+ private class RandomEventSpout extends BaseRichSpout {
+ private SpoutOutputCollector _collector;
+ @Override
+ public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
+ _collector = spoutOutputCollector;
+ }
+
+ @Override
+ public void nextTuple() {
+ _collector.emit(Arrays.asList("disk.usage",System.currentTimeMillis(),"host_1",56.7));
+ _collector.emit(Arrays.asList("cpu.usage",System.currentTimeMillis(),"host_2",99.8));
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+ outputFieldsDeclarer.declare(new Fields("metric","timestamp","source","value"));
+ }
+ }
+
+ public final static class Provider extends AbstractApplicationProvider<TestStormApplication> {
+ public Provider(){
+ super("TestApplicationMetadata.xml");
+ }
+ @Override
+ public TestStormApplication getApplication() {
+ return new TestStormApplication();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/storm/MockStormApplication.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/storm/MockStormApplication.java b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/storm/MockStormApplication.java
index 3db5f20..bbdfbfa 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/storm/MockStormApplication.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/storm/MockStormApplication.java
@@ -1,97 +1,94 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.storm;
-
-import backtype.storm.generated.StormTopology;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.tuple.Fields;
-import org.apache.eagle.app.Configuration;
-import org.apache.eagle.app.StormApplication;
-import org.apache.eagle.app.environment.impl.StormEnvironment;
-
-import java.util.Arrays;
-import java.util.Map;
-
-public class MockStormApplication extends StormApplication<MockStormApplication.MockStormConfiguration> {
- private MockStormConfiguration appConfig;
-
- @Override
- public StormTopology execute(MockStormConfiguration config, StormEnvironment environment) {
- this.setAppConfig(config);
- TopologyBuilder builder = new TopologyBuilder();
- builder.setSpout("metric_spout", new RandomEventSpout(), config.getSpoutNum());
- builder.setBolt("sink_1",environment.getFlattenStreamSink("TEST_STREAM_1",config)).fieldsGrouping("metric_spout",new Fields("metric"));
- builder.setBolt("sink_2",environment.getFlattenStreamSink("TEST_STREAM_2",config)).fieldsGrouping("metric_spout",new Fields("metric"));
- return builder.createTopology();
- }
-
- public MockStormConfiguration getAppConfig() {
- return appConfig;
- }
-
- private void setAppConfig(MockStormConfiguration appConfig) {
- this.appConfig = appConfig;
- }
-
- /**
- * TODO: Load configuration from name space in application className
- * Application Configuration
- */
- static class MockStormConfiguration extends Configuration {
- private int spoutNum = 1;
- private boolean loaded = false;
-
- public int getSpoutNum() {
- return spoutNum;
- }
-
- public void setSpoutNum(int spoutNum) {
- this.spoutNum = spoutNum;
- }
-
- public boolean isLoaded() {
- return loaded;
- }
-
- public void setLoaded(boolean loaded) {
- this.loaded = loaded;
- }
- }
-
- private class RandomEventSpout extends BaseRichSpout {
- private SpoutOutputCollector _collector;
- @Override
- public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
- _collector = spoutOutputCollector;
- }
-
- @Override
- public void nextTuple() {
- _collector.emit(Arrays.asList("disk.usage",System.currentTimeMillis(),"host_1",56.7));
- _collector.emit(Arrays.asList("cpu.usage",System.currentTimeMillis(),"host_2",99.8));
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
- outputFieldsDeclarer.declare(new Fields("metric","timestamp","source","value"));
- }
- }
-}
\ No newline at end of file
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.storm;
+
+import backtype.storm.generated.StormTopology;
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import com.typesafe.config.Config;
+import org.apache.eagle.app.Configuration;
+import org.apache.eagle.app.StormApplication;
+import org.apache.eagle.app.environment.impl.StormEnvironment;
+
+import java.util.Arrays;
+import java.util.Map;
+
+public class MockStormApplication extends StormApplication<MockStormApplication.MockStormConfiguration> {
+ private MockStormConfiguration appConfig;
+
+ @Override
+ public StormTopology execute(MockStormConfiguration config, StormEnvironment environment) {
+ return null;
+ }
+
+ @Override
+ public StormTopology execute(Config config, StormEnvironment environment) {
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.setSpout("metric_spout", new RandomEventSpout(), config.getInt("spoutNum"));
+ builder.setBolt("sink_1",environment.getFlattenStreamSink("TEST_STREAM_1",config)).fieldsGrouping("metric_spout",new Fields("metric"));
+ builder.setBolt("sink_2",environment.getFlattenStreamSink("TEST_STREAM_2",config)).fieldsGrouping("metric_spout",new Fields("metric"));
+ return builder.createTopology();
+ }
+
+ /**
+ * TODO: Load configuration from name space in application className
+ * Application Configuration
+ */
+ static class MockStormConfiguration extends Configuration {
+ private int spoutNum = 1;
+ private boolean loaded = false;
+
+ public int getSpoutNum() {
+ return spoutNum;
+ }
+
+ public void setSpoutNum(int spoutNum) {
+ this.spoutNum = spoutNum;
+ }
+
+ public boolean isLoaded() {
+ return loaded;
+ }
+
+ public void setLoaded(boolean loaded) {
+ this.loaded = loaded;
+ }
+ }
+
+ private class RandomEventSpout extends BaseRichSpout {
+ private SpoutOutputCollector _collector;
+ @Override
+ public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
+ _collector = spoutOutputCollector;
+ }
+
+ @Override
+ public void nextTuple() {
+ _collector.emit(Arrays.asList("disk.usage",System.currentTimeMillis(),"host_1",56.7));
+ _collector.emit(Arrays.asList("cpu.usage",System.currentTimeMillis(),"host_2",99.8));
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+ outputFieldsDeclarer.declare(new Fields("metric","timestamp","source","value"));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/storm/MockStormApplicationTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/storm/MockStormApplicationTest.java b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/storm/MockStormApplicationTest.java
index 8e05b5e..32dae23 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/storm/MockStormApplicationTest.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/storm/MockStormApplicationTest.java
@@ -1,75 +1,51 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.storm;
-
-import com.typesafe.config.ConfigFactory;
-import org.apache.eagle.app.environment.impl.StormEnvironment;
-import org.apache.eagle.app.utils.DynamicJarPathFinder;
-import org.apache.eagle.metadata.model.ApplicationEntity;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.HashMap;
-
-public class MockStormApplicationTest {
- @Test
- public void testGetConfigClass(){
- MockStormApplication mockStormApplication = new MockStormApplication();
- Assert.assertEquals(MockStormApplication.MockStormConfiguration.class,mockStormApplication.getConfigType());
- }
-
- @Test
- public void testGetConfigFromMap(){
- MockStormApplication mockStormApplication = new MockStormApplication();
- mockStormApplication.execute(new HashMap<String,Object>(){
- {
- put("spoutNum",1234);
- put("loaded",true);
- put("mode", ApplicationEntity.Mode.CLUSTER);
- }
- },new StormEnvironment(ConfigFactory.load()));
- Assert.assertTrue(mockStormApplication.getAppConfig().isLoaded());
- Assert.assertEquals(1234,mockStormApplication.getAppConfig().getSpoutNum());
- Assert.assertEquals(ApplicationEntity.Mode.CLUSTER,mockStormApplication.getAppConfig().getMode());
- }
-
- @Test
- public void testGetConfigFromEnvironmentConfigFile(){
- MockStormApplication mockStormApplication = new MockStormApplication();
- mockStormApplication.execute(new StormEnvironment(ConfigFactory.load()));
- Assert.assertTrue(mockStormApplication.getAppConfig().isLoaded());
- Assert.assertEquals(3,mockStormApplication.getAppConfig().getSpoutNum());
- Assert.assertEquals(ApplicationEntity.Mode.LOCAL,mockStormApplication.getAppConfig().getMode());
- }
-
- @Test
- public void testRunApplicationWithSysConfig(){
- new MockStormApplication().run();
- }
-
- @Test
- public void testRunApplicationWithAppConfig() throws InterruptedException {
- MockStormApplication.MockStormConfiguration appConfig = new MockStormApplication.MockStormConfiguration();
- appConfig.setJarPath(DynamicJarPathFinder.findPath(MockStormApplication.class));
- appConfig.setSiteId("test_site");
- appConfig.setAppId("test_application_storm_topology");
- appConfig.setMode(ApplicationEntity.Mode.LOCAL);
- appConfig.setLoaded(true);
- appConfig.setSpoutNum(4);
- new MockStormApplication().run(appConfig);
- }
-}
\ No newline at end of file
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.storm;
+
+import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.app.environment.impl.StormEnvironment;
+import org.apache.eagle.app.utils.DynamicJarPathFinder;
+import org.apache.eagle.metadata.model.ApplicationEntity;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+
+public class MockStormApplicationTest {
+ @Test
+ public void testGetConfigClass(){
+ MockStormApplication mockStormApplication = new MockStormApplication();
+ Assert.assertEquals(MockStormApplication.MockStormConfiguration.class,mockStormApplication.getConfigType());
+ }
+
+ @Test
+ public void testRunApplicationWithSysConfig(){
+ new MockStormApplication().run();
+ }
+
+ @Test
+ public void testRunApplicationWithAppConfig() throws InterruptedException {
+ MockStormApplication.MockStormConfiguration appConfig = new MockStormApplication.MockStormConfiguration();
+ appConfig.setJarPath(DynamicJarPathFinder.findPath(MockStormApplication.class));
+ appConfig.setSiteId("test_site");
+ appConfig.setAppId("test_application_storm_topology");
+ appConfig.setMode(ApplicationEntity.Mode.LOCAL);
+ appConfig.setLoaded(true);
+ appConfig.setSpoutNum(4);
+ new MockStormApplication().run(appConfig);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-app/eagle-app-base/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/test/resources/application.conf b/eagle-core/eagle-app/eagle-app-base/src/test/resources/application.conf
index 9f154f9..64f0974 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/test/resources/application.conf
+++ b/eagle-core/eagle-app/eagle-app-base/src/test/resources/application.conf
@@ -52,10 +52,15 @@
}
}
- "org.apache.eagle.app.storm.MockStormApplication": {
- "spoutNum": 3
- "loaded": true
- "mode":"LOCAL",
- "appId":"test_topology_name"
+ "appId":"test_topology_name"
+ "spoutNum": 3
+ "loaded": true
+ "mode":"LOCAL"
+
+ "dataSinkConfig": {
+ "topic" : "test_topic",
+ "brokerList" : "sandbox.hortonworks.com:6667",
+ "serializerClass" : "kafka.serializer.StringEncoder",
+ "keySerializerClass" : "kafka.serializer.StringEncoder"
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-app/eagle-application-service/src/main/java/org/apache/eagle/service/application/ApplicationManagementResource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-application-service/src/main/java/org/apache/eagle/service/application/ApplicationManagementResource.java b/eagle-core/eagle-app/eagle-application-service/src/main/java/org/apache/eagle/service/application/ApplicationManagementResource.java
index 6e4521d..c651b6b 100644
--- a/eagle-core/eagle-app/eagle-application-service/src/main/java/org/apache/eagle/service/application/ApplicationManagementResource.java
+++ b/eagle-core/eagle-app/eagle-application-service/src/main/java/org/apache/eagle/service/application/ApplicationManagementResource.java
@@ -21,7 +21,6 @@ package org.apache.eagle.service.application;
import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
-import org.apache.eagle.policy.common.Constants;
import org.apache.eagle.service.application.dao.ApplicationManagerDAO;
import org.apache.eagle.service.application.dao.ApplicationManagerDaoImpl;
import org.apache.eagle.service.application.entity.TopologyExecutionStatus;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntity.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntity.java
index edb9fc0..2b37d25 100644
--- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntity.java
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntity.java
@@ -111,11 +111,11 @@ public class GenericServiceAPIResponseEntity<T>{
public String getException() {
return exception;
}
- public void setException(String exception) {
- this.exception = exception;
- }
+// public void setException(String exception) {
+// this.exception = exception;
+// }
- public void setException(Exception exception){
- if(exception!=null) this.exception = EagleExceptionWrapper.wrap(exception);
+ public void setException(Exception exceptionObj){
+ if(exception!=null) this.exception = EagleExceptionWrapper.wrap(exceptionObj);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntityDeserializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntityDeserializer.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntityDeserializer.java
index 85b875d..940ee8a 100644
--- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntityDeserializer.java
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntityDeserializer.java
@@ -54,7 +54,8 @@ public class GenericServiceAPIResponseEntityDeserializer extends JsonDeserialize
else if(SUCCESS_FIELD.equals(field.getKey()) && field.getValue() != null){
entity.setSuccess(field.getValue().getValueAsBoolean(false));
}else if(EXCEPTION_FIELD.equals(field.getKey()) && field.getValue() != null){
- entity.setException(field.getValue().getTextValue());
+// entity.setException(field.getValue().getTextValue());
+ entity.setException(new Exception(field.getValue().getTextValue()));
}else if(TYPE_FIELD.endsWith(field.getKey()) && field.getValue() != null){
try {
entity.setType(Class.forName(field.getValue().getTextValue()));
@@ -81,4 +82,4 @@ public class GenericServiceAPIResponseEntityDeserializer extends JsonDeserialize
}
return entity;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/generic/GenericEntityServiceResource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/generic/GenericEntityServiceResource.java b/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/generic/GenericEntityServiceResource.java
index fb52352..b7f27f5 100644
--- a/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/generic/GenericEntityServiceResource.java
+++ b/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/generic/GenericEntityServiceResource.java
@@ -439,7 +439,7 @@ public class GenericEntityServiceResource {
LOG.error("Data storage is null");
throw new IllegalDataStorageException("data storage is null");
}
-
+
QueryResult<?> result = queryStatement.execute(dataStorage);
if(result.isSuccess()){
meta.put(FIRST_TIMESTAMP, result.getFirstTimestamp());
@@ -543,7 +543,7 @@ public class GenericEntityServiceResource {
LOG.error("Data storage is null");
throw new IllegalDataStorageException("Data storage is null");
}
-
+
DeleteStatement deleteStatement = new DeleteStatement(rawQuery);
ModifyResult<String> deleteResult = deleteStatement.execute(dataStorage);
if(deleteResult.isSuccess()){
@@ -627,4 +627,4 @@ public class GenericEntityServiceResource {
}
return response;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleStormApplication.java
----------------------------------------------------------------------
diff --git a/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleStormApplication.java b/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleStormApplication.java
index 6819e59..d4c0e0c 100644
--- a/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleStormApplication.java
+++ b/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleStormApplication.java
@@ -24,6 +24,7 @@ import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;
+import com.typesafe.config.Config;
import org.apache.eagle.app.StormApplication;
import org.apache.eagle.app.environment.impl.StormEnvironment;
@@ -33,8 +34,13 @@ import java.util.Map;
public class ExampleStormApplication extends StormApplication<ExampleStormConfig> {
@Override
public StormTopology execute(ExampleStormConfig config, StormEnvironment environment) {
+ return null;
+ }
+
+ @Override
+ public StormTopology execute(Config config, StormEnvironment environment) {
TopologyBuilder builder = new TopologyBuilder();
- builder.setSpout("metric_spout", new RandomEventSpout(), config.getSpoutNum());
+ builder.setSpout("metric_spout", new RandomEventSpout(), config.getInt("spoutNum"));
builder.setBolt("sink_1",environment.getFlattenStreamSink("SAMPLE_STREAM_1",config)).fieldsGrouping("metric_spout",new Fields("metric"));
builder.setBolt("sink_2",environment.getFlattenStreamSink("SAMPLE_STREAM_2",config)).fieldsGrouping("metric_spout",new Fields("metric"));
return builder.createTopology();
@@ -59,4 +65,4 @@ public class ExampleStormApplication extends StormApplication<ExampleStormConfig
outputFieldsDeclarer.declare(new Fields("metric","timestamp","source","value"));
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-examples/eagle-app-example/src/test/java/org/apache/eagle/app/example/ExampleApplicationProviderTest.java
----------------------------------------------------------------------
diff --git a/eagle-examples/eagle-app-example/src/test/java/org/apache/eagle/app/example/ExampleApplicationProviderTest.java b/eagle-examples/eagle-app-example/src/test/java/org/apache/eagle/app/example/ExampleApplicationProviderTest.java
index 45dd7bd..88dd02d 100644
--- a/eagle-examples/eagle-app-example/src/test/java/org/apache/eagle/app/example/ExampleApplicationProviderTest.java
+++ b/eagle-examples/eagle-app-example/src/test/java/org/apache/eagle/app/example/ExampleApplicationProviderTest.java
@@ -26,10 +26,13 @@ import org.apache.eagle.metadata.model.ApplicationEntity;
import org.apache.eagle.metadata.model.SiteEntity;
import org.apache.eagle.metadata.resource.SiteResource;
import org.junit.Assert;
+import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
@RunWith(AppUnitTestRunner.class)
public class ExampleApplicationProviderTest {
@@ -54,6 +57,7 @@ public class ExampleApplicationProviderTest {
* @throws InterruptedException
*/
@Test
+ @Ignore
public void testApplicationLifecycle() throws InterruptedException {
// Create local site
SiteEntity siteEntity = new SiteEntity();
@@ -63,8 +67,10 @@ public class ExampleApplicationProviderTest {
siteResource.createSite(siteEntity);
Assert.assertNotNull(siteEntity.getUuid());
+ ApplicationOperations.InstallOperation installOperation = new ApplicationOperations.InstallOperation("test_site","EXAMPLE_APPLICATION", ApplicationEntity.Mode.LOCAL);
+ installOperation.setConfiguration(getConf());
// Install application
- ApplicationEntity applicationEntity = applicationResource.installApplication(new ApplicationOperations.InstallOperation("test_site","EXAMPLE_APPLICATION", ApplicationEntity.Mode.LOCAL)).getData();
+ ApplicationEntity applicationEntity = applicationResource.installApplication(installOperation).getData();
// Start application
applicationResource.startApplication(new ApplicationOperations.StartOperation(applicationEntity.getUuid()));
// Stop application
@@ -81,16 +87,29 @@ public class ExampleApplicationProviderTest {
@Test
public void testApplicationQuickRunWithAppType(){
- simulator.start("EXAMPLE_APPLICATION");
+ simulator.start("EXAMPLE_APPLICATION", getConf());
}
+ @Ignore
@Test
- public void testApplicationQuickRunWithAppProvider(){
- simulator.start(ExampleApplicationProvider.class);
+ public void testApplicationQuickRunWithAppProvider() throws Exception{
+ simulator.start(ExampleApplicationProvider.class, getConf());
}
+ @Ignore
@Test
- public void testApplicationQuickRunWithAppProvider2(){
- simulator.start(ExampleApplicationProvider2.class);
+ public void testApplicationQuickRunWithAppProvider2() throws Exception{
+ simulator.start(ExampleApplicationProvider2.class, getConf());
}
-}
\ No newline at end of file
+
+ private Map<String, Object> getConf(){
+ Map<String, Object> conf = new HashMap<>();
+ conf.put("dataSinkConfig.topic", "testTopic");
+ conf.put("dataSinkConfig.brokerList", "broker");
+ conf.put("dataSinkConfig.serializerClass", "serializerClass");
+ conf.put("dataSinkConfig.keySerializerClass", "keySerializerClass");
+ conf.put("spoutNum", 2);
+ conf.put("mode", "LOCAL");
+ return conf;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-examples/eagle-app-example/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-examples/eagle-app-example/src/test/resources/application.conf b/eagle-examples/eagle-app-example/src/test/resources/application.conf
index bfbf20e..2873037 100644
--- a/eagle-examples/eagle-app-example/src/test/resources/application.conf
+++ b/eagle-examples/eagle-app-example/src/test/resources/application.conf
@@ -56,7 +56,15 @@
}
},
- "org.apache.eagle.app.example.ExampleStormApplication": {
"appId": "unit_test_example_app"
+ "spoutNum": 3
+ "loaded": true
+ "mode":"LOCAL"
+
+ "dataSinkConfig": {
+ "topic" : "test_topic",
+ "brokerList" : "sandbox.hortonworks.com:6667",
+ "serializerClass" : "kafka.serializer.StringEncoder",
+ "keySerializerClass" : "kafka.serializer.StringEncoder"
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-jpm/eagle-jpm-app/src/main/java/org/apache/eagle/app/jpm/JPMApplication.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-app/src/main/java/org/apache/eagle/app/jpm/JPMApplication.java b/eagle-jpm/eagle-jpm-app/src/main/java/org/apache/eagle/app/jpm/JPMApplication.java
index 89e5433..25506cc 100644
--- a/eagle-jpm/eagle-jpm-app/src/main/java/org/apache/eagle/app/jpm/JPMApplication.java
+++ b/eagle-jpm/eagle-jpm-app/src/main/java/org/apache/eagle/app/jpm/JPMApplication.java
@@ -23,6 +23,7 @@ import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
+import com.typesafe.config.Config;
import org.apache.eagle.app.StormApplication;
import org.apache.eagle.app.environment.impl.StormEnvironment;
@@ -32,6 +33,11 @@ import java.util.Map;
public class JPMApplication extends StormApplication<JPMConfiguration> {
@Override
public StormTopology execute(JPMConfiguration config, StormEnvironment environment) {
+ return null;
+ }
+
+ @Override
+ public StormTopology execute(Config config, StormEnvironment environment) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("metric_spout", new RandomEventSpout(), 4);
builder.setBolt("sink_1",environment.getFlattenStreamSink("SAMPLE_STREAM_1",config)).fieldsGrouping("metric_spout",new Fields("metric"));
@@ -57,4 +63,4 @@ public class JPMApplication extends StormApplication<JPMConfiguration> {
outputFieldsDeclarer.declare(new Fields("metric","timestamp","source","value"));
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-jpm/eagle-jpm-app/src/test/java/org/apache/eagle/app/jpm/JPMApplicationTest.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-app/src/test/java/org/apache/eagle/app/jpm/JPMApplicationTest.java b/eagle-jpm/eagle-jpm-app/src/test/java/org/apache/eagle/app/jpm/JPMApplicationTest.java
index 30bbc96..c955d36 100644
--- a/eagle-jpm/eagle-jpm-app/src/test/java/org/apache/eagle/app/jpm/JPMApplicationTest.java
+++ b/eagle-jpm/eagle-jpm-app/src/test/java/org/apache/eagle/app/jpm/JPMApplicationTest.java
@@ -27,6 +27,9 @@ import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
+import java.util.HashMap;
+import java.util.Map;
+
@RunWith(AppUnitTestRunner.class)
public class JPMApplicationTest {
@Inject
@@ -53,8 +56,11 @@ public class JPMApplicationTest {
siteResource.createSite(siteEntity);
Assert.assertNotNull(siteEntity.getUuid());
+ ApplicationOperations.InstallOperation installOperation = new ApplicationOperations.InstallOperation("test_site","JPM_APP", ApplicationEntity.Mode.LOCAL);
+ installOperation.setConfiguration(getConf());
+
// Install application
- ApplicationEntity applicationEntity = applicationResource.installApplication(new ApplicationOperations.InstallOperation("test_site","JPM_APP", ApplicationEntity.Mode.LOCAL)).getData();
+ ApplicationEntity applicationEntity = applicationResource.installApplication(installOperation).getData();
// Start application
applicationResource.startApplication(new ApplicationOperations.StartOperation(applicationEntity.getUuid()));
// Stop application
@@ -68,4 +74,15 @@ public class JPMApplicationTest {
// Expected exception
}
}
+
+ private Map<String, Object> getConf(){
+ Map<String, Object> conf = new HashMap<>();
+ conf.put("dataSinkConfig.topic", "testTopic");
+ conf.put("dataSinkConfig.brokerList", "broker");
+ conf.put("dataSinkConfig.serializerClass", "serializerClass");
+ conf.put("dataSinkConfig.keySerializerClass", "keySerializerClass");
+ conf.put("spoutNum", 2);
+ conf.put("mode", "LOCAL");
+ return conf;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-security/eagle-security-common/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/pom.xml b/eagle-security/eagle-security-common/pom.xml
index 83971f2..18f9bd0 100644
--- a/eagle-security/eagle-security-common/pom.xml
+++ b/eagle-security/eagle-security-common/pom.xml
@@ -52,6 +52,11 @@
<artifactId>eagle-alert-service</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.eagle</groupId>
+ <artifactId>eagle-metadata-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/IMetadataServiceClient.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/IMetadataServiceClient.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/IMetadataServiceClient.java
new file mode 100644
index 0000000..e3c9f95
--- /dev/null
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/IMetadataServiceClient.java
@@ -0,0 +1,32 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * * <p/>
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * * <p/>
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.security.service;
+
+import java.io.Closeable;
+import java.io.Serializable;
+import java.util.Collection;
+
+/**
+ * service stub to get metadata from remote metadata service
+ */
+public interface IMetadataServiceClient extends Closeable, Serializable {
+ Collection<HBaseSensitivityEntity> listHBaseSensitivies();
+ OpResult addHBaseSensitivity(Collection<HBaseSensitivityEntity> h);
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/ISecurityMetadataDAO.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/ISecurityMetadataDAO.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/ISecurityMetadataDAO.java
index 45e729e..534fb38 100644
--- a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/ISecurityMetadataDAO.java
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/ISecurityMetadataDAO.java
@@ -18,6 +18,7 @@
package org.apache.eagle.security.service;
import java.util.Collection;
+
/**
* Since 6/10/16.
*/
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/InMemMetadataDaoImpl.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/InMemMetadataDaoImpl.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/InMemMetadataDaoImpl.java
index 53d7132..27aeb57 100644
--- a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/InMemMetadataDaoImpl.java
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/InMemMetadataDaoImpl.java
@@ -28,7 +28,7 @@ import java.util.*;
/**
* In memory service for simple service start. Make all service API as
* synchronized.
- *
+ *
* @since Apr 11, 2016
*
*/
@@ -39,8 +39,7 @@ public class InMemMetadataDaoImpl implements ISecurityMetadataDAO {
private Map<Pair<String, String>, HBaseSensitivityEntity> hBaseSensitivityEntities = new HashMap<>();
@Inject
- public InMemMetadataDaoImpl(Config config) {
-
+ public InMemMetadataDaoImpl() {
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/MetadataDaoFactory.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/MetadataDaoFactory.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/MetadataDaoFactory.java
index f17fd43..65e86f0 100644
--- a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/MetadataDaoFactory.java
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/MetadataDaoFactory.java
@@ -17,7 +17,6 @@
package org.apache.eagle.security.service;
import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -28,41 +27,28 @@ import java.lang.reflect.Constructor;
*
*/
public class MetadataDaoFactory {
-
- private static final MetadataDaoFactory INSTANCE = new MetadataDaoFactory();
private static final Logger LOG = LoggerFactory.getLogger(MetadataDaoFactory.class);
- private ISecurityMetadataDAO dao;
-
- private MetadataDaoFactory() {
- Config config = ConfigFactory.load();
- Config datastoreConfig = config.getConfig("datastore");
- if (datastoreConfig == null) {
- LOG.warn("datastore is not configured, use in-memory store !!!");
- dao = new InMemMetadataDaoImpl(datastoreConfig);
+ public static ISecurityMetadataDAO getMetadataDAO(String storeCls) {
+ ISecurityMetadataDAO dao = null;
+ if (storeCls == null) {
+ LOG.warn("metadata store is not configured, use in-memory store !!!");
+ dao = new InMemMetadataDaoImpl();
} else {
- String clsName = datastoreConfig.getString("metadataDao");
Class<?> clz;
try {
- clz = Thread.currentThread().getContextClassLoader().loadClass(clsName);
+ clz = Thread.currentThread().getContextClassLoader().loadClass(storeCls);
if (ISecurityMetadataDAO.class.isAssignableFrom(clz)) {
- Constructor<?> cotr = clz.getConstructor(Config.class);
- dao = (ISecurityMetadataDAO) cotr.newInstance(datastoreConfig);
+ Constructor<?> cotr = clz.getConstructor();
+ dao = (ISecurityMetadataDAO) cotr.newInstance();
} else {
throw new Exception("metadataDao configuration need to be implementation of IMetadataDao! ");
}
} catch (Exception e) {
LOG.error("error when initialize the dao, fall back to in memory mode!", e);
- dao = new InMemMetadataDaoImpl(datastoreConfig);
+ dao = new InMemMetadataDaoImpl();
}
}
- }
-
- public static MetadataDaoFactory getInstance() {
- return INSTANCE;
- }
-
- public ISecurityMetadataDAO getMetadataDao() {
return dao;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/MetadataServiceClientImpl.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/MetadataServiceClientImpl.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/MetadataServiceClientImpl.java
new file mode 100644
index 0000000..676bde5
--- /dev/null
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/MetadataServiceClientImpl.java
@@ -0,0 +1,114 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * * <p/>
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * * <p/>
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.security.service;
+
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.GenericType;
+import com.sun.jersey.api.client.WebResource;
+import com.sun.jersey.api.client.config.ClientConfig;
+import com.sun.jersey.api.client.config.DefaultClientConfig;
+import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
+import com.typesafe.config.Config;
+import org.codehaus.jackson.jaxrs.JacksonJsonProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.MediaType;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+
+public class MetadataServiceClientImpl implements IMetadataServiceClient {
+ private static final long serialVersionUID = 3003976065082684128L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(MetadataServiceClientImpl.class);
+
+ private static final String METADATA_LIST_HBASE_SENSITIVITY_PATH = "/metadata/sensitivity/hbase";
+ private static final String METADATA_ADD_HBASE_SENSITIVITY_PATH = "/metadata/sensitivity/hbase";
+
+ private static final String METADATA_CLEAR_PATH = "/metadata/clear";
+
+ private static final String EAGLE_CORRELATION_CONTEXT = "metadataService.context";
+ private static final String EAGLE_CORRELATION_SERVICE_PORT = "metadataService.port";
+ private static final String EAGLE_CORRELATION_SERVICE_HOST = "metadataService.host";
+
+ protected static final String CONTENT_TYPE = "Content-Type";
+
+ private String host;
+ private int port;
+ private String context;
+ private transient Client client;
+ private String basePath;
+
+ public MetadataServiceClientImpl(Config config) {
+ this(config.getString(EAGLE_CORRELATION_SERVICE_HOST), config.getInt(EAGLE_CORRELATION_SERVICE_PORT), config
+ .getString(EAGLE_CORRELATION_CONTEXT));
+ basePath = buildBasePath();
+ }
+
+ public MetadataServiceClientImpl(String host, int port, String context) {
+ this.host = host;
+ this.port = port;
+ this.context = context;
+ this.basePath = buildBasePath();
+ ClientConfig cc = new DefaultClientConfig();
+ cc.getProperties().put(DefaultClientConfig.PROPERTY_CONNECT_TIMEOUT, 60 * 1000);
+ cc.getProperties().put(DefaultClientConfig.PROPERTY_READ_TIMEOUT, 60 * 1000);
+ cc.getClasses().add(JacksonJsonProvider.class);
+ cc.getProperties().put(URLConnectionClientHandler.PROPERTY_HTTP_URL_CONNECTION_SET_METHOD_WORKAROUND, true);
+ this.client = Client.create(cc);
+ client.addFilter(new com.sun.jersey.api.client.filter.GZIPContentEncodingFilter());
+ }
+
+ private String buildBasePath() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("http://");
+ sb.append(host);
+ sb.append(":");
+ sb.append(port);
+ sb.append(context);
+ return sb.toString();
+ }
+
+ private <T> List<T> list(String path, GenericType<List<T>> type) {
+ WebResource r = client.resource(basePath + path);
+ LOG.info("query URL {}", basePath + path);
+ List<T> ret = r.accept(MediaType.APPLICATION_JSON_TYPE).type(MediaType.APPLICATION_JSON).get(type);
+ return ret;
+ }
+
+ @Override
+ public void close() throws IOException {
+ client.destroy();
+ }
+
+ @Override
+ public Collection<HBaseSensitivityEntity> listHBaseSensitivies() {
+ return list(METADATA_LIST_HBASE_SENSITIVITY_PATH, new GenericType<List<HBaseSensitivityEntity>>() {
+ });
+ }
+
+ @Override
+ public OpResult addHBaseSensitivity(Collection<HBaseSensitivityEntity> h) {
+ WebResource r = client.resource(basePath + METADATA_ADD_HBASE_SENSITIVITY_PATH);
+ r.accept(MediaType.APPLICATION_JSON_TYPE).type(MediaType.APPLICATION_JSON).post(h);
+ return new OpResult();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/SensitivityMetadataResource.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/SensitivityMetadataResource.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/SensitivityMetadataResource.java
deleted file mode 100644
index 05440fb..0000000
--- a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/SensitivityMetadataResource.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.security.service;
-
-import javax.ws.rs.Path;
-
-/**
- * Since 6/10/16.
- */
-@Path("/metadata/sensitivity")
-public class SensitivityMetadataResource {
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/topo/NewKafkaSourcedSpoutProvider.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/topo/NewKafkaSourcedSpoutProvider.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/topo/NewKafkaSourcedSpoutProvider.java
index e60e59e..3401b3c 100644
--- a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/topo/NewKafkaSourcedSpoutProvider.java
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/topo/NewKafkaSourcedSpoutProvider.java
@@ -56,8 +56,6 @@ public class NewKafkaSourcedSpoutProvider implements StormSpoutProvider {
String groupId = context.getString("consumerGroupId");
// Kafka fetch size
int fetchSize = context.getInt("fetchSize");
- // Kafka deserializer class
- String deserClsName = context.getString("deserializerClass");
// Kafka broker zk connection
String zkConnString = context.getString("zkConnection");
// transaction zkRoot
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogAppConf.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogAppConf.java b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogAppConf.java
new file mode 100644
index 0000000..662311c
--- /dev/null
+++ b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogAppConf.java
@@ -0,0 +1,28 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * * <p/>
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * * <p/>
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.security.hbase;
+
+import org.apache.eagle.app.Configuration;
+
+/**
+ * Since 8/5/16.
+ */
+public class HBaseAuditLogAppConf extends Configuration{
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogAppProvider.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogAppProvider.java b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogAppProvider.java
new file mode 100644
index 0000000..051d8c4
--- /dev/null
+++ b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogAppProvider.java
@@ -0,0 +1,38 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * * <p/>
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * * <p/>
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.security.hbase;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.app.spi.AbstractApplicationProvider;
+import org.apache.eagle.security.service.MetadataDaoFactory;
+
+/**
+ * Since 8/5/16.
+ */
+public class HBaseAuditLogAppProvider extends AbstractApplicationProvider<HBaseAuditLogApplication> {
+ public HBaseAuditLogAppProvider() {
+ super("/META-INF/metadata.xml");
+ }
+
+ @Override
+ public HBaseAuditLogApplication getApplication() {
+ return new HBaseAuditLogApplication();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogApplication.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogApplication.java b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogApplication.java
index 49393e1..3d80308 100644
--- a/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogApplication.java
+++ b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogApplication.java
@@ -16,27 +16,35 @@
*/
package org.apache.eagle.security.hbase;
+import backtype.storm.generated.StormTopology;
import backtype.storm.topology.BoltDeclarer;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.app.StormApplication;
+import org.apache.eagle.app.environment.impl.StormEnvironment;
+import org.apache.eagle.app.sink.StormStreamSink;
import org.apache.eagle.security.topo.NewKafkaSourcedSpoutProvider;
import storm.kafka.StringScheme;
-import storm.kafka.bolt.KafkaBolt;
/**
* Since 7/27/16.
*/
-public class HBaseAuditLogApplication{
+public class HBaseAuditLogApplication extends StormApplication<HBaseAuditLogAppConf> {
public final static String SPOUT_TASK_NUM = "topology.numOfSpoutTasks";
public final static String PARSER_TASK_NUM = "topology.numOfParserTasks";
public final static String JOIN_TASK_NUM = "topology.numOfJoinTasks";
public final static String SINK_TASK_NUM = "topology.numOfSinkTasks";
- protected void buildApp(TopologyBuilder builder) {
- System.setProperty("config.resource", "/application.conf");
- Config config = ConfigFactory.load();
+ @Override
+ public StormTopology execute(HBaseAuditLogAppConf config1, StormEnvironment environment) {
+ return null;
+ }
+
+ @Override
+ public StormTopology execute(Config config, StormEnvironment environment) {
+ TopologyBuilder builder = new TopologyBuilder();
NewKafkaSourcedSpoutProvider provider = new NewKafkaSourcedSpoutProvider();
IRichSpout spout = provider.getSpout(config);
@@ -55,8 +63,15 @@ public class HBaseAuditLogApplication{
BoltDeclarer joinBoltDeclarer = builder.setBolt("joinBolt", joinBolt, numOfJoinTasks);
joinBoltDeclarer.fieldsGrouping("parserBolt", new Fields("f1"));
- KafkaBolt kafkaBolt = new KafkaBolt();
- BoltDeclarer kafkaBoltDeclarer = builder.setBolt("kafkaSink", kafkaBolt, numOfSinkTasks);
- kafkaBoltDeclarer.shuffleGrouping("joinBolt");
+ StormStreamSink sinkBolt = environment.getFlattenStreamSink("hbase_audit_log_stream",config);
+ BoltDeclarer kafkaBoltDeclarer = builder.setBolt("kafkaSink", sinkBolt, numOfSinkTasks);
+ kafkaBoltDeclarer.fieldsGrouping("joinBolt", new Fields("user"));
+ return builder.createTopology();
+ }
+
+ public static void main(String[] args){
+ Config config = ConfigFactory.load();
+ HBaseAuditLogApplication app = new HBaseAuditLogApplication();
+ app.run(config);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HbaseAuditLogMonitoringMain.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HbaseAuditLogMonitoringMain.java b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HbaseAuditLogMonitoringMain.java
deleted file mode 100644
index 13ca214..0000000
--- a/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HbaseAuditLogMonitoringMain.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.eagle.security.hbase;
-
-import backtype.storm.generated.StormTopology;
-import backtype.storm.topology.BoltDeclarer;
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.tuple.Fields;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.apache.eagle.security.topo.NewKafkaSourcedSpoutProvider;
-import org.apache.eagle.security.topo.TopologySubmitter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import storm.kafka.*;
-import storm.kafka.bolt.KafkaBolt;
-
-public class HbaseAuditLogMonitoringMain {
- private static Logger LOG = LoggerFactory.getLogger(HbaseAuditLogMonitoringMain.class);
- public final static String SPOUT_TASK_NUM = "topology.numOfSpoutTasks";
- public final static String PARSER_TASK_NUM = "topology.numOfParserTasks";
- public final static String JOIN_TASK_NUM = "topology.numOfJoinTasks";
- public final static String SINK_TASK_NUM = "topology.numOfSinkTasks";
-
- public static void main(String[] args) throws Exception{
- System.setProperty("config.resource", "/application.conf");
- Config config = ConfigFactory.load();
- NewKafkaSourcedSpoutProvider provider = new NewKafkaSourcedSpoutProvider();
- IRichSpout spout = provider.getSpout(config);
-
- HBaseAuditLogParserBolt bolt = new HBaseAuditLogParserBolt();
- TopologyBuilder builder = new TopologyBuilder();
-
- int numOfSpoutTasks = config.getInt(SPOUT_TASK_NUM);
- int numOfParserTasks = config.getInt(PARSER_TASK_NUM);
- int numOfJoinTasks = config.getInt(JOIN_TASK_NUM);
- int numOfSinkTasks = config.getInt(SINK_TASK_NUM);
-
- builder.setSpout("ingest", spout, numOfSpoutTasks);
- BoltDeclarer boltDeclarer = builder.setBolt("parserBolt", bolt, numOfParserTasks);
- boltDeclarer.fieldsGrouping("ingest", new Fields(StringScheme.STRING_SCHEME_KEY));
-
- HbaseResourceSensitivityDataJoinBolt joinBolt = new HbaseResourceSensitivityDataJoinBolt(config);
- BoltDeclarer joinBoltDeclarer = builder.setBolt("joinBolt", joinBolt, numOfJoinTasks);
- joinBoltDeclarer.fieldsGrouping("parserBolt", new Fields("f1"));
-
- KafkaBolt kafkaBolt = new KafkaBolt();
- BoltDeclarer kafkaBoltDeclarer = builder.setBolt("kafkaSink", kafkaBolt, numOfSinkTasks);
- kafkaBoltDeclarer.shuffleGrouping("joinBolt");
-
- StormTopology topology = builder.createTopology();
-
- TopologySubmitter.submit(topology, config);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HbaseResourceSensitivityDataJoinBolt.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HbaseResourceSensitivityDataJoinBolt.java b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HbaseResourceSensitivityDataJoinBolt.java
index cf486d3..d8d9d6b 100644
--- a/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HbaseResourceSensitivityDataJoinBolt.java
+++ b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HbaseResourceSensitivityDataJoinBolt.java
@@ -23,14 +23,12 @@ import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
-import com.fasterxml.jackson.databind.ObjectMapper;
import com.typesafe.config.Config;
import org.apache.eagle.security.entity.HbaseResourceSensitivityAPIEntity;
import org.apache.eagle.security.util.ExternalDataCache;
import org.apache.eagle.security.util.ExternalDataJoiner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
import java.util.Arrays;
import java.util.Map;
@@ -94,9 +92,7 @@ public class HbaseResourceSensitivityDataJoinBolt extends BaseRichBolt {
}
LOG.info("After hbase resource sensitivity lookup: " + newEvent);
// push to Kafka sink
- ObjectMapper mapper = new ObjectMapper();
- String msg = mapper.writeValueAsString(map);
- collector.emit(Arrays.asList(newEvent.get("user"), msg));
+ collector.emit(Arrays.asList(newEvent.get("user"), newEvent));
}catch(Exception ex){
LOG.error("error joining data, ignore it", ex);
}finally {
@@ -106,6 +102,6 @@ public class HbaseResourceSensitivityDataJoinBolt extends BaseRichBolt {
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields(FieldNameBasedTupleToKafkaMapper.BOLT_KEY, FieldNameBasedTupleToKafkaMapper.BOLT_MESSAGE));
+ declarer.declare(new Fields("user", "message"));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HbaseResourceSensitivityPollingJob.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HbaseResourceSensitivityPollingJob.java b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HbaseResourceSensitivityPollingJob.java
index 9ca0701..603ed5a 100644
--- a/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HbaseResourceSensitivityPollingJob.java
+++ b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HbaseResourceSensitivityPollingJob.java
@@ -19,10 +19,8 @@ package org.apache.eagle.security.hbase;
import com.google.common.base.Function;
import com.google.common.collect.Maps;
-import org.apache.eagle.security.entity.HbaseResourceSensitivityAPIEntity;
-import org.apache.eagle.security.service.HBaseSensitivityEntity;
-import org.apache.eagle.security.service.ISecurityMetadataDAO;
-import org.apache.eagle.security.service.MetadataDaoFactory;
+import org.apache.eagle.common.config.EagleConfigConstants;
+import org.apache.eagle.security.service.*;
import org.apache.eagle.security.util.AbstractResourceSensitivityPollingJob;
import org.apache.eagle.security.util.ExternalDataCache;
import org.quartz.Job;
@@ -33,7 +31,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
-import java.util.List;
import java.util.Map;
public class HbaseResourceSensitivityPollingJob extends AbstractResourceSensitivityPollingJob implements Job {
@@ -44,12 +41,33 @@ public class HbaseResourceSensitivityPollingJob extends AbstractResourceSensitiv
throws JobExecutionException {
JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();
try {
- ISecurityMetadataDAO dao = MetadataDaoFactory.getInstance().getMetadataDao();
- Collection<HBaseSensitivityEntity> sensitivityEntities = dao.listHBaseSensitivies();
- ExternalDataCache.getInstance().setJobResult(getClass(), sensitivityEntities);
+ Collection<HBaseSensitivityEntity> sensitivityEntities = load(jobDataMap);
+ Map<String, HBaseSensitivityEntity> map = Maps.uniqueIndex(
+ sensitivityEntities,
+ new Function<HBaseSensitivityEntity, String>() {
+ @Override
+ public String apply(HBaseSensitivityEntity input) {
+ return input.getHbaseResource();
+ }
+ });
+ ExternalDataCache.getInstance().setJobResult(getClass(), map);
} catch(Exception ex) {
LOG.error("Fail to load hbase resource sensitivity data", ex);
}
}
-}
\ No newline at end of file
+ private Collection<HBaseSensitivityEntity> load(JobDataMap jobDataMap) throws Exception {
+ Map<String, Object> map = (Map<String,Object>)jobDataMap.get(EagleConfigConstants.EAGLE_SERVICE);
+ String eagleServiceHost = (String)map.get(EagleConfigConstants.HOST);
+ Integer eagleServicePort = Integer.parseInt(map.get(EagleConfigConstants.PORT).toString());
+ String username = map.containsKey(EagleConfigConstants.USERNAME) ? (String)map.get(EagleConfigConstants.USERNAME) : null;
+ String password = map.containsKey(EagleConfigConstants.PASSWORD) ? (String)map.get(EagleConfigConstants.PASSWORD) : null;
+
+ // load from eagle database
+ LOG.info("Load hbase resource sensitivity information from eagle service "
+ + eagleServiceHost + ":" + eagleServicePort);
+
+ IMetadataServiceClient client = new MetadataServiceClientImpl(eagleServiceHost, eagleServicePort, "/rest");
+ return client.listHBaseSensitivies();
+ }
+}