You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2017/02/16 05:24:22 UTC
[2/2] eagle git commit: [EAGLE-895] Improve alert engine metadata to
organize by siteId
[EAGLE-895] Improve alert engine metadata to organize by siteId
https://issues.apache.org/jira/browse/EAGLE-895
Author: Hao Chen <ha...@apache.org>
Closes #801 from haoch/AddPolicySiteId.
Project: http://git-wip-us.apache.org/repos/asf/eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/49ca3b0e
Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/49ca3b0e
Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/49ca3b0e
Branch: refs/heads/master
Commit: 49ca3b0ec481f6fcfbf339cb6d3b63b4dede1011
Parents: 7681287
Author: Hao Chen <ha...@apache.org>
Authored: Thu Feb 16 13:24:08 2017 +0800
Committer: Hao Chen <ha...@apache.org>
Committed: Thu Feb 16 13:24:08 2017 +0800
----------------------------------------------------------------------
.../eagle/alert/app/AlertEagleStorePlugin.java | 16 +-
.../app/AlertUnitTopologyAppProviderTest.java | 2 +-
.../engine/coordinator/PolicyDefinition.java | 20 +-
.../engine/coordinator/PublishmentType.java | 73 +-
.../coordinator/PolicyDefinitionTest.java | 2 +-
.../publisher/AlertPublishPluginProvider.java | 24 +
.../publisher/PublishementTypeLoader.java | 58 ++
.../publisher/impl/AlertEagleStorePlugin.java | 12 +-
.../publisher/impl/AlertEmailPublisher.java | 16 +-
.../publisher/impl/AlertFilePublisher.java | 13 +-
.../publisher/impl/AlertKafkaPublisher.java | 17 +-
.../publisher/impl/AlertSlackPublisher.java | 17 +-
.../publisher/PublishementTypeLoaderTest.java | 27 +
.../metadata/resource/MetadataResource.java | 51 +-
.../resource/StreamDefinitionWrapper.java | 72 ++
.../eagle/alert/metadata/IMetadataDao.java | 8 +
.../environment/impl/StormExecutionRuntime.java | 3 +-
.../ApplicationStatusUpdateServiceImpl.java | 73 +-
.../eagle/app/spi/ApplicationProvider.java | 2 +-
.../app/test/ApplicationSimulatorImpl.java | 3 +-
.../eagle/app/test/ApplicationTestBase.java | 25 +
.../app/resource/ApplicationResourceTest.java | 2 +-
eagle-core/eagle-common/pom.xml | 4 +
.../eagle/common/utils/ReflectionsHelper.java | 48 ++
.../eagle/service/hbase/EmbeddedHbase.java | 7 +-
.../eagle/service/hbase/EmbeddedHbaseTest.java | 6 +-
.../eagle/service/hbase/TestHBaseBase.java | 44 +-
.../service/ApplicationStatusUpdateService.java | 2 -
.../client/impl/EagleServiceClientImpl.java | 9 +-
.../eagle/service/client/ClientTestBase.java | 2 +-
.../eagle-query/eagle-entity-base/pom.xml | 28 +-
.../entity/repo/EntityRepositoryScanner.java | 92 ++-
.../TestGenericEntityIndexStreamReader.java | 14 +-
.../eagle/log/entity/TestTestLogAPIEntity.java | 735 ++++++++++---------
.../repo/TestEntityRepositoryScanner.java | 5 +-
.../eagle/storage/hbase/TestHBaseStatement.java | 17 +-
.../storage/hbase/TestWithHBaseCoprocessor.java | 77 ++
.../coprocessor/TestGroupAggregateClient.java | 60 +-
.../TestGroupAggregateTimeSeriesClient.java | 26 +-
.../storage/hbase/spi/TestHBaseStorage.java | 22 +-
...estHBaseStorageAggregateWithCoprocessor.java | 36 +-
.../hbase/spi/TestHBaseStorageLoader.java | 17 +-
.../src/test/resources/log4j.properties | 2 +-
.../example/ExampleApplicationProviderTest.java | 5 +-
.../eagle/app/jpm/JPMWebApplicationTest.java | 1 +
.../auditlog/TestHdfsAuditLogApplication.java | 7 +-
.../apache/eagle/server/ServerApplication.java | 8 +
pom.xml | 14 +-
48 files changed, 1190 insertions(+), 634 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/java/org/apache/eagle/alert/app/AlertEagleStorePlugin.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/java/org/apache/eagle/alert/app/AlertEagleStorePlugin.java b/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/java/org/apache/eagle/alert/app/AlertEagleStorePlugin.java
index 30d2b78..0b58bf7 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/java/org/apache/eagle/alert/app/AlertEagleStorePlugin.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/java/org/apache/eagle/alert/app/AlertEagleStorePlugin.java
@@ -19,7 +19,9 @@ package org.apache.eagle.alert.app;
import com.google.common.base.Preconditions;
import com.typesafe.config.Config;
import org.apache.eagle.alert.engine.coordinator.Publishment;
+import org.apache.eagle.alert.engine.coordinator.PublishmentType;
import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.engine.publisher.AlertPublishPluginProvider;
import org.apache.eagle.alert.engine.publisher.impl.AbstractPublishPlugin;
import org.apache.eagle.alert.utils.AlertConstants;
import org.apache.eagle.metadata.model.AlertEntity;
@@ -36,15 +38,14 @@ import java.util.Map;
import static org.apache.eagle.alert.engine.model.AlertPublishEvent.*;
-public class AlertEagleStorePlugin extends AbstractPublishPlugin {
+public class AlertEagleStorePlugin extends AbstractPublishPlugin implements AlertPublishPluginProvider {
private static final Logger LOG = LoggerFactory.getLogger(AlertEagleStorePlugin.class);
private IEagleServiceClient client;
@Override
public void init(Config config, Publishment publishment, Map conf) throws Exception {
super.init(config, publishment, conf);
- client = new EagleServiceClientImpl(config.getString("service.host"), config.getInt("service.port"),
- config.getString("service.username"), config.getString("service.password"));
+ client = new EagleServiceClientImpl(config);
}
@Override
@@ -94,4 +95,13 @@ public class AlertEagleStorePlugin extends AbstractPublishPlugin {
alertEvent.setTags(tags);
return alertEvent;
}
+
+ @Override
+ public PublishmentType getPluginType() {
+ return new PublishmentType.Builder()
+ .name("HBaseStorage")
+ .type(getClass())
+ .description("HBase Storage alert publisher")
+ .build();
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-alert-parent/eagle-alert-app/src/test/java/org/apache/eagle/alert/app/AlertUnitTopologyAppProviderTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-app/src/test/java/org/apache/eagle/alert/app/AlertUnitTopologyAppProviderTest.java b/eagle-core/eagle-alert-parent/eagle-alert-app/src/test/java/org/apache/eagle/alert/app/AlertUnitTopologyAppProviderTest.java
index 927d505..4383484 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert-app/src/test/java/org/apache/eagle/alert/app/AlertUnitTopologyAppProviderTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert-app/src/test/java/org/apache/eagle/alert/app/AlertUnitTopologyAppProviderTest.java
@@ -56,7 +56,7 @@ public class AlertUnitTopologyAppProviderTest extends ApplicationTestBase {
statusUpdateService.updateApplicationEntityStatus(applicationEntity);
// Stop application
applicationResource.stopApplication(new ApplicationOperations.StopOperation(applicationEntity.getUuid()));
- statusUpdateService.updateApplicationEntityStatus(applicationEntity);
+ awaitApplicationStop(applicationEntity);
// Uninstall application
applicationResource.uninstallApplication(new ApplicationOperations.UninstallOperation(applicationEntity.getUuid()));
try {
http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
index 7398dd5..c377e41 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
@@ -35,8 +35,9 @@ public class PolicyDefinition implements Serializable {
@Length(min = 1, max = 50, message = "length should between 1 and 50")
private String name;
private String description;
- private List<String> inputStreams = new ArrayList<String>();
- private List<String> outputStreams = new ArrayList<String>();
+ private List<String> inputStreams = new ArrayList<>();
+ private List<String> outputStreams = new ArrayList<>();
+ private String siteId = "default";
private Definition definition;
private Definition stateDefinition;
@@ -137,6 +138,7 @@ public class PolicyDefinition implements Serializable {
@Override
public int hashCode() {
return new HashCodeBuilder()
+ .append(siteId)
.append(name)
.append(inputStreams)
.append(outputStreams)
@@ -160,7 +162,8 @@ public class PolicyDefinition implements Serializable {
PolicyDefinition another = (PolicyDefinition) that;
- if (Objects.equals(another.name, this.name)
+ if (Objects.equals(another.siteId, this.siteId)
+ && Objects.equals(another.name, this.name)
&& Objects.equals(another.description, this.description)
&& CollectionUtils.isEqualCollection(another.inputStreams, this.inputStreams)
&& CollectionUtils.isEqualCollection(another.outputStreams, this.outputStreams)
@@ -191,6 +194,14 @@ public class PolicyDefinition implements Serializable {
return alertDefinition == null ? null : alertDefinition.getCategory();
}
+ public String getSiteId() {
+ return siteId;
+ }
+
+ public void setSiteId(String siteId) {
+ this.siteId = siteId;
+ }
+
@JsonIgnoreProperties(ignoreUnknown = true)
public static class Definition implements Serializable {
private static final long serialVersionUID = -622366527887848346L;
@@ -294,9 +305,8 @@ public class PolicyDefinition implements Serializable {
ENABLED, DISABLED
}
-
@Override
public String toString() {
- return String.format("{name=\"%s\",definition=%s}", this.getName(), this.getDefinition() == null ? "null" : this.getDefinition().toString());
+ return String.format("{site=\"%s\", name=\"%s\",definition=%s}", this.getSiteId(), this.getName(), this.getDefinition() == null ? "null" : this.getDefinition().toString());
}
}
http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java
index 5bd15bc..f7025f2 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java
@@ -21,16 +21,25 @@ package org.apache.eagle.alert.engine.coordinator;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import org.apache.commons.lang3.builder.HashCodeBuilder;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
+import java.util.*;
@JsonIgnoreProperties(ignoreUnknown = true)
public class PublishmentType {
private String name;
+
+ @Override
+ public String toString() {
+ return "PublishmentType{"
+ + "name='" + name + '\''
+ + ", type='" + type + '\''
+ + ", description='" + description + '\''
+ + ", fields=" + fields
+ + '}';
+ }
+
private String type;
private String description;
- private List<Map<String, String>> fields;
+ private List<Map<String, String>> fields = new LinkedList<>();
public String getName() {
return name;
@@ -64,6 +73,8 @@ public class PublishmentType {
this.fields = fields;
}
+
+
@Override
public boolean equals(Object obj) {
if (obj instanceof PublishmentType) {
@@ -85,4 +96,56 @@ public class PublishmentType {
.append(fields)
.build();
}
-}
+
+
+ public static class Builder {
+ private final PublishmentType publishmentType;
+
+ public Builder() {
+ this.publishmentType = new PublishmentType();
+ }
+
+ public Builder type(Class<?> typeClass) {
+ this.publishmentType.setType(typeClass.getName());
+ return this;
+ }
+
+ public Builder name(String name) {
+ this.publishmentType.setName(name);
+ return this;
+ }
+
+ public Builder description(String description) {
+ this.publishmentType.setDescription(description);
+ return this;
+ }
+
+ public Builder field(Map<String,String> fieldDesc) {
+ this.publishmentType.getFields().add(fieldDesc);
+ return this;
+ }
+
+ public Builder field(String name, String value) {
+ this.publishmentType.getFields().add(new HashMap<String,String>() {
+ {
+ put("name", name);
+ put("value", value);
+ }
+ });
+ return this;
+ }
+
+ public Builder field(String name) {
+ this.publishmentType.getFields().add(new HashMap<String,String>() {
+ {
+ put("name", name);
+ }
+ });
+ return this;
+ }
+
+ public PublishmentType build() {
+ return this.publishmentType;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinitionTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinitionTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinitionTest.java
index 7acb4f7..77b3517 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinitionTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinitionTest.java
@@ -76,7 +76,7 @@ public class PolicyDefinitionTest {
sp.setColumns(Arrays.asList("host"));
sp.setType(StreamPartition.Type.GROUPBY);
pd.addPartition(sp);
- Assert.assertEquals("{name=\"policyName\",definition={type=\"siddhi\",value=\"test\", inputStreams=\"[inputStream]\", outputStreams=\"[outputStream]\" }}", pd.toString());
+ Assert.assertEquals("{site=\"default\", name=\"policyName\",definition={type=\"siddhi\",value=\"test\", inputStreams=\"[inputStream]\", outputStreams=\"[outputStream]\" }}", pd.toString());
PolicyDefinition pd1 = new PolicyDefinition();
PolicyDefinition.Definition def1 = new PolicyDefinition.Definition();
http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishPluginProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishPluginProvider.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishPluginProvider.java
new file mode 100644
index 0000000..77eea40
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishPluginProvider.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.engine.publisher;
+
+import org.apache.eagle.alert.engine.coordinator.PublishmentType;
+
+public interface AlertPublishPluginProvider {
+ PublishmentType getPluginType();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/PublishementTypeLoader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/PublishementTypeLoader.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/PublishementTypeLoader.java
new file mode 100644
index 0000000..820d70e
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/PublishementTypeLoader.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.engine.publisher;
+
+import org.apache.eagle.alert.engine.coordinator.PublishmentType;
+import org.apache.eagle.common.utils.ReflectionsHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.List;
+
+public class PublishementTypeLoader {
+ private static final Logger LOGGER = LoggerFactory.getLogger(PublishementTypeLoader.class);
+
+ private final List<PublishmentType> publishmentTypeSet;
+
+ private PublishementTypeLoader() {
+ this.publishmentTypeSet = new LinkedList<>();
+ LOGGER.info("Loading alert publish plugins ...");
+ for (Class<? extends AlertPublishPluginProvider> clazz: ReflectionsHelper.getInstance().getSubTypesOf(AlertPublishPluginProvider.class)) {
+ LOGGER.debug("Loading alert publish plugin: {}", clazz);
+ try {
+ PublishmentType type = clazz.newInstance().getPluginType();
+ this.publishmentTypeSet.add(type);
+ LOGGER.info("Loaded alert publish plugin {}:{}", type.getName(), type.getType());
+ } catch (InstantiationException | IllegalAccessException e) {
+ LOGGER.error("Failed to get instantiate alert publish plugin provider: {}", clazz, e);
+ }
+ }
+ LOGGER.info("Loaded {} alert publish plugins", this.publishmentTypeSet.size());
+ }
+
+ private static final PublishementTypeLoader INSTANCE = new PublishementTypeLoader();
+
+ public static List<PublishmentType> loadPublishmentTypes() {
+ return INSTANCE.getPublishmentTypes();
+ }
+
+ public List<PublishmentType> getPublishmentTypes() {
+ return publishmentTypeSet;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePlugin.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePlugin.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePlugin.java
index 48c3663..b410cda 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePlugin.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePlugin.java
@@ -20,8 +20,10 @@ package org.apache.eagle.alert.engine.publisher.impl;
import com.typesafe.config.Config;
import org.apache.eagle.alert.engine.coordinator.Publishment;
+import org.apache.eagle.alert.engine.coordinator.PublishmentType;
import org.apache.eagle.alert.engine.model.AlertPublishEvent;
import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.engine.publisher.AlertPublishPluginProvider;
import org.apache.eagle.alert.engine.publisher.PublishConstants;
import org.apache.eagle.alert.service.IMetadataServiceClient;
import org.apache.eagle.alert.service.MetadataServiceClientImpl;
@@ -34,7 +36,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-public class AlertEagleStorePlugin extends AbstractPublishPlugin {
+public class AlertEagleStorePlugin extends AbstractPublishPlugin implements AlertPublishPluginProvider {
private static Logger LOG = LoggerFactory.getLogger(AlertEagleStorePlugin.class);
private transient IMetadataServiceClient client;
@@ -72,4 +74,12 @@ public class AlertEagleStorePlugin extends AbstractPublishPlugin {
return LOG;
}
+ @Override
+ public PublishmentType getPluginType() {
+ return new PublishmentType.Builder()
+ .name("JDBCStorage")
+ .type(getClass())
+ .description("Publish alerts into eagle metadata store")
+ .build();
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java
index d08d114..152a9f1 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java
@@ -19,7 +19,9 @@
package org.apache.eagle.alert.engine.publisher.impl;
import org.apache.eagle.alert.engine.coordinator.Publishment;
+import org.apache.eagle.alert.engine.coordinator.PublishmentType;
import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.engine.publisher.AlertPublishPluginProvider;
import org.apache.eagle.alert.engine.publisher.PublishConstants;
import org.apache.eagle.alert.engine.publisher.email.AlertEmailConstants;
import org.apache.eagle.alert.engine.publisher.email.AlertEmailGenerator;
@@ -41,7 +43,7 @@ import java.util.concurrent.TimeUnit;
import static org.apache.eagle.alert.service.MetadataServiceClientImpl.*;
import static org.apache.eagle.common.mail.AlertEmailConstants.*;
-public class AlertEmailPublisher extends AbstractPublishPlugin {
+public class AlertEmailPublisher extends AbstractPublishPlugin implements AlertPublishPluginProvider {
private static final Logger LOG = LoggerFactory.getLogger(AlertEmailPublisher.class);
private static final int DEFAULT_THREAD_POOL_CORE_SIZE = 4;
@@ -206,4 +208,16 @@ public class AlertEmailPublisher extends AbstractPublishPlugin {
protected Logger getLogger() {
return LOG;
}
+
+ @Override
+ public PublishmentType getPluginType() {
+ return new PublishmentType.Builder()
+ .name("Email")
+ .type(AlertEmailPublisher.class)
+ .description("Email alert publisher")
+ .field("subject")
+ .field("sender")
+ .field("recipients")
+ .build();
+ }
}
http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertFilePublisher.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertFilePublisher.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertFilePublisher.java
index 1848979..375a0da 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertFilePublisher.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertFilePublisher.java
@@ -19,8 +19,10 @@
package org.apache.eagle.alert.engine.publisher.impl;
import org.apache.eagle.alert.engine.coordinator.Publishment;
+import org.apache.eagle.alert.engine.coordinator.PublishmentType;
import org.apache.eagle.alert.engine.model.AlertPublishEvent;
import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.engine.publisher.AlertPublishPluginProvider;
import org.apache.eagle.alert.engine.publisher.PublishConstants;
import org.apache.eagle.common.DateTimeUtil;
@@ -33,7 +35,7 @@ import java.util.List;
import java.util.Map;
import java.util.logging.*;
-public class AlertFilePublisher extends AbstractPublishPlugin {
+public class AlertFilePublisher extends AbstractPublishPlugin implements AlertPublishPluginProvider {
private Logger filelogger = Logger.getLogger(AlertFilePublisher.class.getName());
private FileHandler handler;
@@ -67,6 +69,15 @@ public class AlertFilePublisher extends AbstractPublishPlugin {
filelogger.setUseParentHandlers(false);
}
+ @Override
+ public PublishmentType getPluginType() {
+ return new PublishmentType.Builder()
+ .name("File")
+ .type(AlertFilePublisher.class)
+ .description("Local log file publisher")
+ .build();
+ }
+
class AlertFileFormatter extends Formatter {
@Override
http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java
index e48f2eb..adac1aa 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java
@@ -26,7 +26,9 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.eagle.alert.engine.coordinator.Publishment;
+import org.apache.eagle.alert.engine.coordinator.PublishmentType;
import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.engine.publisher.AlertPublishPluginProvider;
import org.apache.eagle.alert.engine.publisher.PublishConstants;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
@@ -37,7 +39,7 @@ import org.slf4j.LoggerFactory;
import com.typesafe.config.Config;
-public class AlertKafkaPublisher extends AbstractPublishPlugin {
+public class AlertKafkaPublisher extends AbstractPublishPlugin implements AlertPublishPluginProvider {
private static final Logger LOG = LoggerFactory.getLogger(AlertKafkaPublisher.class);
private static final long MAX_TIMEOUT_MS = 60000;
@@ -181,4 +183,15 @@ public class AlertKafkaPublisher extends AbstractPublishPlugin {
protected Logger getLogger() {
return LOG;
}
-}
+
+ @Override
+ public PublishmentType getPluginType() {
+ return new PublishmentType.Builder()
+ .name("Kafka")
+ .type(getClass())
+ .description("Kafka alert publisher")
+ .field("kafka_broker","localhost:9092")
+ .field("topic")
+ .build();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertSlackPublisher.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertSlackPublisher.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertSlackPublisher.java
index 6ce6ed7..0d60246 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertSlackPublisher.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertSlackPublisher.java
@@ -25,8 +25,10 @@ import com.ullink.slack.simpleslackapi.SlackSession;
import com.ullink.slack.simpleslackapi.impl.SlackSessionFactory;
import org.apache.commons.lang.StringUtils;
import org.apache.eagle.alert.engine.coordinator.Publishment;
+import org.apache.eagle.alert.engine.coordinator.PublishmentType;
import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.engine.publisher.AlertPublishPluginProvider;
import org.apache.eagle.alert.engine.publisher.PublishConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,7 +41,7 @@ import java.util.Map;
/**
* @since Sep 14, 2016.
*/
-public class AlertSlackPublisher extends AbstractPublishPlugin {
+public class AlertSlackPublisher extends AbstractPublishPlugin implements AlertPublishPluginProvider {
private static final Logger LOG = LoggerFactory.getLogger(AlertSlackPublisher.class);
private SlackSession session;
@@ -155,4 +157,17 @@ public class AlertSlackPublisher extends AbstractPublishPlugin {
SlackChannel channel = session.findChannelByName(channelName);
session.sendMessage(channel, message, attachment);
}
+
+ @Override
+ public PublishmentType getPluginType() {
+ return new PublishmentType.Builder()
+ .name("Slack")
+ .type(getClass())
+ .description("Slack alert publisher")
+ .field("token")
+ .field("channels")
+ .field("severitys")
+ .field("urltemplate")
+ .build();
+ }
}
http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/PublishementTypeLoaderTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/PublishementTypeLoaderTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/PublishementTypeLoaderTest.java
new file mode 100644
index 0000000..3df5fc8
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/PublishementTypeLoaderTest.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.engine.publisher;
+
+import org.junit.Test;
+
+public class PublishementTypeLoaderTest {
+ @Test
+ public void testPublishmentTypeLoader() {
+ PublishementTypeLoader.loadPublishmentTypes();
+ }
+}
http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
index 617b4f0..2d30e85 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
@@ -16,6 +16,9 @@
*/
package org.apache.eagle.service.metadata.resource;
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import org.apache.commons.lang3.StringUtils;
import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
import org.apache.eagle.alert.coordination.model.ScheduleState;
import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
@@ -25,19 +28,20 @@ import org.apache.eagle.alert.engine.interpreter.PolicyInterpreter;
import org.apache.eagle.alert.engine.interpreter.PolicyParseResult;
import org.apache.eagle.alert.engine.interpreter.PolicyValidationResult;
import org.apache.eagle.alert.engine.model.AlertPublishEvent;
+import org.apache.eagle.alert.engine.publisher.PublishementTypeLoader;
import org.apache.eagle.alert.metadata.IMetadataDao;
import org.apache.eagle.alert.metadata.impl.MetadataDaoFactory;
import org.apache.eagle.alert.metadata.resource.Models;
import org.apache.eagle.alert.metadata.resource.OpResult;
-
-import com.google.common.base.Preconditions;
-import com.google.inject.Inject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.*;
import javax.validation.Valid;
import javax.ws.rs.*;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
/**
* @since Apr 11, 2016.
@@ -135,6 +139,27 @@ public class MetadataResource {
return dao.createStream(stream);
}
+ @Path("/streams/create")
+ @POST
+ public OpResult createStream(StreamDefinitionWrapper stream) {
+ Preconditions.checkNotNull(stream.getStreamDefinition(),"Stream definition is null");
+ Preconditions.checkNotNull(stream.getStreamSource(),"Stream source is null");
+ stream.validateAndEnsureDefault();
+ OpResult createStreamResult = dao.createStream(stream.getStreamDefinition());
+ OpResult createDataSourceResult = dao.addDataSource(stream.getStreamSource());
+ // TODO: Check kafka topic exist or not.
+ if (createStreamResult.code == OpResult.SUCCESS
+ && createDataSourceResult.code == OpResult.SUCCESS) {
+ return OpResult.success("Successfully create stream "
+ + stream.getStreamDefinition().getStreamId()
+ + ", and datasource "
+ + stream.getStreamSource().getName());
+ } else {
+ return OpResult.fail("Error: "
+ + StringUtils.join(new String[]{createDataSourceResult.message, createDataSourceResult.message},","));
+ }
+ }
+
@Path("/streams/batch")
@POST
public List<OpResult> addStreams(List<StreamDefinition> streams) {
@@ -201,8 +226,12 @@ public class MetadataResource {
@Path("/policies")
@GET
- public List<PolicyDefinition> listPolicies() {
- return dao.listPolicies();
+ public List<PolicyDefinition> listPolicies(@QueryParam("siteId") String siteId) {
+ if (siteId != null) {
+ return dao.getPoliciesBySiteId(siteId);
+ } else {
+ return dao.listPolicies();
+ }
}
@Path("/policies")
@@ -281,7 +310,7 @@ public class MetadataResource {
try {
PolicyDefinition policyDefinition = getPolicyById(policyId);
policyDefinition.setPolicyStatus(status);
- OpResult updateResult = addPolicy(policyDefinition);
+ OpResult updateResult = addPolicy(policyDefinition);
result.code = updateResult.code;
if (result.code == OpResult.SUCCESS) {
@@ -292,7 +321,7 @@ public class MetadataResource {
LOG.error(result.message);
}
} catch (Exception e) {
- LOG.error("Error: " + e.getMessage(),e);
+ LOG.error("Error: " + e.getMessage(), e);
result.code = OpResult.FAILURE;
result.message = e.getMessage();
}
@@ -350,17 +379,19 @@ public class MetadataResource {
@Path("/publishmentTypes")
@GET
public List<PublishmentType> listPublishmentType() {
- return dao.listPublishmentType();
+ return PublishementTypeLoader.loadPublishmentTypes();
}
@Path("/publishmentTypes")
@POST
+ @Deprecated
public OpResult addPublishmentType(PublishmentType publishmentType) {
return dao.addPublishmentType(publishmentType);
}
@Path("/publishmentTypes/batch")
@POST
+ @Deprecated
public List<OpResult> addPublishmentTypes(List<PublishmentType> publishmentTypes) {
List<OpResult> results = new LinkedList<>();
for (PublishmentType pubType : publishmentTypes) {
@@ -371,12 +402,14 @@ public class MetadataResource {
@Path("/publishmentTypes/{name}")
@DELETE
+ @Deprecated
public OpResult removePublishmentType(@PathParam("name") String name) {
return dao.removePublishmentType(name);
}
@Path("/publishmentTypes")
@DELETE
+ @Deprecated
public List<OpResult> removePublishmentTypes(List<String> pubTypes) {
List<OpResult> results = new LinkedList<>();
for (String pubType : pubTypes) {
http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/StreamDefinitionWrapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/StreamDefinitionWrapper.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/StreamDefinitionWrapper.java
new file mode 100644
index 0000000..738c978
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/StreamDefinitionWrapper.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.service.metadata.resource;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
+import org.apache.eagle.alert.coordination.model.Tuple2StreamMetadata;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.scheme.JsonStringStreamNameSelector;
+
+import java.util.Properties;
+
+public class StreamDefinitionWrapper {
+ private Kafka2TupleMetadata streamSource;
+ private StreamDefinition streamDefinition;
+
+ public Kafka2TupleMetadata getStreamSource() {
+ return streamSource;
+ }
+
+ public void setStreamSource(Kafka2TupleMetadata streamSource) {
+ this.streamSource = streamSource;
+ }
+
+ public StreamDefinition getStreamDefinition() {
+ return streamDefinition;
+ }
+
+ public void setStreamDefinition(StreamDefinition streamDefinition) {
+ this.streamDefinition = streamDefinition;
+ }
+
+ public void validateAndEnsureDefault() {
+ Preconditions.checkNotNull(streamSource);
+ Preconditions.checkNotNull(streamDefinition);
+ if (streamSource.getType() == null) {
+ streamSource.setType("KAFKA");
+ }
+ String dataSourceName = (getStreamDefinition().getStreamId() + "_CUSTOMIZED").toUpperCase();
+ getStreamDefinition().setDataSource(dataSourceName);
+ getStreamSource().setName(dataSourceName);
+ Tuple2StreamMetadata codec = new Tuple2StreamMetadata();
+ codec.setTimestampColumn("timestamp");
+ codec.setStreamNameSelectorCls(JsonStringStreamNameSelector.class.getName());
+ Properties streamNameSelectorProp = new Properties();
+ streamNameSelectorProp.put("userProvidedStreamName", streamSource.getName());
+ codec.setStreamNameSelectorProp(streamNameSelectorProp);
+ if (StringUtils.isBlank(codec.getStreamNameSelectorCls())) {
+ codec.setStreamNameSelectorCls(JsonStringStreamNameSelector.class.getName());
+ }
+ if (StringUtils.isBlank(codec.getTimestampFormat())) {
+ codec.setTimestampFormat(null);
+ }
+ this.streamSource.setCodec(codec);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/IMetadataDao.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/IMetadataDao.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/IMetadataDao.java
index 2dc7f51..2d2a90f 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/IMetadataDao.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/IMetadataDao.java
@@ -75,10 +75,13 @@ public interface IMetadataDao extends Closeable {
OpResult removePublishment(String pubId);
+ @Deprecated
List<PublishmentType> listPublishmentType();
+ @Deprecated
OpResult addPublishmentType(PublishmentType publishmentType);
+ @Deprecated
OpResult removePublishmentType(String pubType);
List<AlertPublishEvent> listAlertPublishEvent(int size);
@@ -190,4 +193,9 @@ public interface IMetadataDao extends Closeable {
}
return result;
}
+
+ default List<PolicyDefinition> getPoliciesBySiteId(String siteId) {
+ Preconditions.checkNotNull(siteId,"siteId");
+ return listPolicies().stream().filter(pc -> pc.getSiteId().equals(siteId)).collect(Collectors.toList());
+ }
}
http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
index f61a291..2b4180d 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
@@ -186,9 +186,10 @@ public class StormExecutionRuntime implements ExecutionRuntime<StormEnvironment,
} else if (topologySummary.get_status().equalsIgnoreCase("INACTIVE")) {
status = ApplicationEntity.Status.STOPPED;
} else if (topologySummary.get_status().equalsIgnoreCase("KILLED")) {
- status = ApplicationEntity.Status.STOPPED;
+ status = ApplicationEntity.Status.STOPPING;
} else {
LOG.error("Unknown storm topology ({}) status: {}", topologySummary.get_status(),topologySummary.get_status());
+ status = ApplicationEntity.Status.UNKNOWN;
}
}
}
http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationStatusUpdateServiceImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationStatusUpdateServiceImpl.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationStatusUpdateServiceImpl.java
index 02c3a5e..b5bec1b 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationStatusUpdateServiceImpl.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationStatusUpdateServiceImpl.java
@@ -71,51 +71,60 @@ public class ApplicationStatusUpdateServiceImpl extends ApplicationStatusUpdateS
}
@Override
- public void updateApplicationEntityStatus(Collection<ApplicationEntity> applicationEntities) {
- }
-
- @Override
public void updateApplicationEntityStatus(ApplicationEntity applicationEntity) {
String appUuid = applicationEntity.getUuid();
- ApplicationEntity.Status currentStatus = applicationEntity.getStatus();
+ ApplicationEntity.Status preStatus = applicationEntity.getStatus();
try {
- ApplicationEntity.Status topologyStatus = applicationManagementService.getStatus(new ApplicationOperations.CheckStatusOperation(appUuid));
- if (currentStatus == ApplicationEntity.Status.STARTING) {
- if (topologyStatus == ApplicationEntity.Status.RUNNING) {
- applicationEntityService.delete(applicationEntity);
- applicationEntity.setStatus(ApplicationEntity.Status.RUNNING);
- applicationEntityService.create(applicationEntity);
+ ApplicationEntity.Status currentStatus = applicationManagementService.getStatus(new ApplicationOperations.CheckStatusOperation(appUuid));
+ if (preStatus == ApplicationEntity.Status.STARTING) {
+ if (currentStatus == ApplicationEntity.Status.RUNNING) {
+ // applicationEntityService.delete(applicationEntity);
+ // applicationEntity.setStatus(ApplicationEntity.Status.RUNNING);
+ // applicationEntityService.create(applicationEntity);
+ currentStatus = ApplicationEntity.Status.RUNNING;
// handle the topology corruption case:
- } else if (topologyStatus == ApplicationEntity.Status.REMOVED) {
- applicationEntityService.delete(applicationEntity);
- applicationEntity.setStatus(ApplicationEntity.Status.INITIALIZED);
- applicationEntityService.create(applicationEntity);
+ } else if (currentStatus == ApplicationEntity.Status.REMOVED) {
+ // applicationEntityService.delete(applicationEntity);
+ // applicationEntity.setStatus(ApplicationEntity.Status.INITIALIZED);
+ // applicationEntityService.create(applicationEntity);
+ currentStatus = ApplicationEntity.Status.INITIALIZED;
}
- } else if (currentStatus == ApplicationEntity.Status.STOPPING) {
- if (topologyStatus == ApplicationEntity.Status.REMOVED) {
- applicationEntityService.delete(applicationEntity);
- applicationEntity.setStatus(ApplicationEntity.Status.INITIALIZED);
- applicationEntityService.create(applicationEntity);
+ } else if (preStatus == ApplicationEntity.Status.STOPPING) {
+ if (currentStatus == ApplicationEntity.Status.REMOVED) {
+ // applicationEntityService.delete(applicationEntity);
+ // applicationEntity.setStatus(ApplicationEntity.Status.INITIALIZED);
+ // applicationEntityService.create(applicationEntity);
+ currentStatus = ApplicationEntity.Status.INITIALIZED;
}
- } else if (currentStatus == ApplicationEntity.Status.RUNNING) {
+ } else if (preStatus == ApplicationEntity.Status.RUNNING) {
// handle the topology corruption case:
- if (topologyStatus == ApplicationEntity.Status.REMOVED) {
- applicationEntityService.delete(applicationEntity);
- applicationEntity.setStatus(ApplicationEntity.Status.INITIALIZED);
- applicationEntityService.create(applicationEntity);
+ if (currentStatus == ApplicationEntity.Status.REMOVED) {
+ // applicationEntityService.delete(applicationEntity);
+ // applicationEntity.setStatus(ApplicationEntity.Status.INITIALIZED);
+ // applicationEntityService.create(applicationEntity);
+ currentStatus = ApplicationEntity.Status.INITIALIZED;
}
- } else if (currentStatus == ApplicationEntity.Status.INITIALIZED) {
+ } else if (preStatus == ApplicationEntity.Status.INITIALIZED) {
//corner case: when Storm service go down, app status-> initialized,
//then when storm server is up again, storm topology will be launched automatically->active
- if (topologyStatus == ApplicationEntity.Status.RUNNING) {
- applicationEntityService.delete(applicationEntity);
- applicationEntity.setStatus(ApplicationEntity.Status.RUNNING);
- applicationEntityService.create(applicationEntity);
+ if (currentStatus == ApplicationEntity.Status.RUNNING) {
+ // applicationEntityService.delete(applicationEntity);
+ // applicationEntity.setStatus(ApplicationEntity.Status.RUNNING);
+ // applicationEntityService.create(applicationEntity);
+ currentStatus = ApplicationEntity.Status.RUNNING;
}
}
- // "STOPPED" is not used in Eagle, so just do nothing.
- applicationEntity.setStatus(topologyStatus);
+ if (currentStatus == ApplicationEntity.Status.REMOVED) {
+ currentStatus = ApplicationEntity.Status.INITIALIZED;
+ }
+
+ // "STOPPED" is not used in Eagle, so just do nothing.
+ if (preStatus != currentStatus) {
+ LOG.info("Application {} status changed from {} to {}", applicationEntity.getAppId(), preStatus, currentStatus);
+ }
+ applicationEntity.setStatus(currentStatus);
+ applicationEntityService.update(applicationEntity);
} catch (RuntimeException e) {
LOG.error(e.getMessage(), e);
}
http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java
index eff232a..0172498 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java
@@ -30,7 +30,7 @@ import java.util.List;
import java.util.Optional;
/**
- * Application Service KafkaStreamMessaging Interface.
+ * Application Service Provider Interface (SPI)
*
* @param <T> Application Type.
*/
http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulatorImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulatorImpl.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulatorImpl.java
index 1b066ef..a5f5a73 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulatorImpl.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulatorImpl.java
@@ -79,7 +79,8 @@ public class ApplicationSimulatorImpl extends ApplicationSimulator {
while (attempt < 10) {
attempt++;
statusUpdateService.updateApplicationEntityStatus(applicationEntity);
- if (applicationEntity.getStatus() == ApplicationEntity.Status.STOPPED) {
+ if (applicationEntity.getStatus() == ApplicationEntity.Status.STOPPED
+ || applicationEntity.getStatus() == ApplicationEntity.Status.INITIALIZED) {
break;
} else {
try {
http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationTestBase.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationTestBase.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationTestBase.java
index 6bc73fc..52b8e79 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationTestBase.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationTestBase.java
@@ -17,12 +17,20 @@
package org.apache.eagle.app.test;
import com.google.inject.Guice;
+import com.google.inject.Inject;
import com.google.inject.Injector;
+import org.apache.eagle.metadata.model.ApplicationEntity;
+import org.apache.eagle.metadata.service.ApplicationStatusUpdateService;
+import org.junit.Assert;
import org.junit.Before;
public class ApplicationTestBase {
private Injector injector;
+
+ @Inject
+ ApplicationStatusUpdateService statusUpdateService;
+
@Before
public void setUp() {
injector = Guice.createInjector(new ApplicationTestGuiceModule());
@@ -32,4 +40,21 @@ public class ApplicationTestBase {
protected Injector injector() {
return injector;
}
+
+ protected void awaitApplicationStop(ApplicationEntity applicationEntity) throws InterruptedException {
+ int attempt = 0;
+ while (attempt < 10) {
+ attempt ++;
+ if (applicationEntity.getStatus() == ApplicationEntity.Status.STOPPED
+ || applicationEntity.getStatus() == ApplicationEntity.Status.INITIALIZED) {
+ break;
+ } else {
+ statusUpdateService.updateApplicationEntityStatus(applicationEntity);
+ Thread.sleep(1000);
+ }
+ }
+ if (attempt > 10) {
+ Assert.fail("Failed to wait for application to STOPPED after 10 attempts");
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/resource/ApplicationResourceTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/resource/ApplicationResourceTest.java b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/resource/ApplicationResourceTest.java
index 59925fd..6c68cd2 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/resource/ApplicationResourceTest.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/resource/ApplicationResourceTest.java
@@ -59,7 +59,7 @@ public class ApplicationResourceTest extends ApplicationTestBase {
statusUpdateService.updateApplicationEntityStatus(applicationEntity);
// Stop application
applicationResource.stopApplication(new ApplicationOperations.StopOperation(applicationEntity.getUuid()));
- statusUpdateService.updateApplicationEntityStatus(applicationEntity);
+ awaitApplicationStop(applicationEntity);
// Uninstall application
applicationResource.uninstallApplication(new ApplicationOperations.UninstallOperation(applicationEntity.getUuid()));
try {
http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-common/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-common/pom.xml b/eagle-core/eagle-common/pom.xml
index 6ab250e..2b72f44 100644
--- a/eagle-core/eagle-common/pom.xml
+++ b/eagle-core/eagle-common/pom.xml
@@ -105,6 +105,10 @@
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.reflections</groupId>
+ <artifactId>reflections</artifactId>
+ </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/utils/ReflectionsHelper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/utils/ReflectionsHelper.java b/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/utils/ReflectionsHelper.java
new file mode 100644
index 0000000..facf07a
--- /dev/null
+++ b/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/utils/ReflectionsHelper.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.common.utils;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.reflections.Reflections;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReflectionsHelper {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ReflectionsHelper.class);
+ private final Reflections reflections;
+ private static final String DEFAULT_PACKAGE = "org.apache.eagle";
+
+ private ReflectionsHelper() {
+ Config config = ConfigFactory.load();
+ String[] packages;
+ if (config.hasPath("scanPackages")) {
+ packages = config.getString("scanPackages").split(",");
+ } else {
+ packages = new String[]{DEFAULT_PACKAGE};
+ }
+ LOGGER.info("Scanning packages: {}", packages);
+ this.reflections = new Reflections(packages);
+ }
+
+ private static ReflectionsHelper INSTANCE = new ReflectionsHelper();
+
+ public static Reflections getInstance() {
+ return INSTANCE.reflections;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-embed/eagle-embed-hbase/src/main/java/org/apache/eagle/service/hbase/EmbeddedHbase.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-embed/eagle-embed-hbase/src/main/java/org/apache/eagle/service/hbase/EmbeddedHbase.java b/eagle-core/eagle-embed/eagle-embed-hbase/src/main/java/org/apache/eagle/service/hbase/EmbeddedHbase.java
index 84661e9..0aeac2c 100644
--- a/eagle-core/eagle-embed/eagle-embed-hbase/src/main/java/org/apache/eagle/service/hbase/EmbeddedHbase.java
+++ b/eagle-core/eagle-embed/eagle-embed-hbase/src/main/java/org/apache/eagle/service/hbase/EmbeddedHbase.java
@@ -23,6 +23,9 @@ import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+
+@Deprecated
public class EmbeddedHbase {
private HBaseTestingUtility util;
private MiniHBaseCluster hbaseCluster;
@@ -58,7 +61,7 @@ public class EmbeddedHbase {
return getInstance(null);
}
- private EmbeddedHbase() {
+ public EmbeddedHbase() {
this(DEFAULT_PORT, DEFAULT_ZNODE);
}
@@ -115,7 +118,7 @@ public class EmbeddedHbase {
public void createTable(String tableName, String cf) {
try {
util.createTable(tableName, cf);
- } catch (Exception ex) {
+ } catch (IOException ex) {
LOG.warn("Create table failed, probably table already existed, table name: " + tableName);
}
}
http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-embed/eagle-embed-hbase/src/test/java/org/apache/eagle/service/hbase/EmbeddedHbaseTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-embed/eagle-embed-hbase/src/test/java/org/apache/eagle/service/hbase/EmbeddedHbaseTest.java b/eagle-core/eagle-embed/eagle-embed-hbase/src/test/java/org/apache/eagle/service/hbase/EmbeddedHbaseTest.java
index e65f062..ee9d32d 100644
--- a/eagle-core/eagle-embed/eagle-embed-hbase/src/test/java/org/apache/eagle/service/hbase/EmbeddedHbaseTest.java
+++ b/eagle-core/eagle-embed/eagle-embed-hbase/src/test/java/org/apache/eagle/service/hbase/EmbeddedHbaseTest.java
@@ -18,9 +18,11 @@ package org.apache.eagle.service.hbase;
import org.junit.Test;
+import java.io.IOException;
+
public class EmbeddedHbaseTest extends TestHBaseBase {
@Test
- public void testHBaseCreateTable() {
- // hbase.createTable("test_hbase_table","f");
+ public void testHBaseCreateTable() throws IOException {
+ hbase.createTable("test_hbase_table","f");
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-embed/eagle-embed-hbase/src/test/java/org/apache/eagle/service/hbase/TestHBaseBase.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-embed/eagle-embed-hbase/src/test/java/org/apache/eagle/service/hbase/TestHBaseBase.java b/eagle-core/eagle-embed/eagle-embed-hbase/src/test/java/org/apache/eagle/service/hbase/TestHBaseBase.java
index 31af2a1..35c0a38 100644
--- a/eagle-core/eagle-embed/eagle-embed-hbase/src/test/java/org/apache/eagle/service/hbase/TestHBaseBase.java
+++ b/eagle-core/eagle-embed/eagle-embed-hbase/src/test/java/org/apache/eagle/service/hbase/TestHBaseBase.java
@@ -17,30 +17,48 @@
package org.apache.eagle.service.hbase;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.junit.AfterClass;
-import org.junit.Assert;
+import org.junit.Before;
import org.junit.BeforeClass;
-import org.junit.Ignore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-
-@Ignore
public class TestHBaseBase {
- protected static EmbeddedHbase hbase;
+ private static final Logger LOGGER = LoggerFactory.getLogger(TestHBaseBase.class);
+ protected static HBaseTestingUtility hbase;
- @BeforeClass
- public static void setUpHBase() {
- hbase = EmbeddedHbase.getInstance();
+ protected static String getZkZnodeParent() {
+ return "/hbase-test";
}
- public static void setupHBaseWithConfig(Configuration config) {
- Assert.assertTrue("HBase test mini cluster should not start", null == hbase);
- hbase = EmbeddedHbase.getInstance(config);
+ @BeforeClass
+ public static void setUpHBase() {
+ Configuration configuration = HBaseConfiguration.create();
+ configuration.set("zookeeper.znode.parent", getZkZnodeParent());
+ configuration.setInt("hbase.master.info.port", -1);//avoid port clobbering
+ configuration.setInt("hbase.regionserver.info.port", -1);//avoid port clobbering
+ hbase = new HBaseTestingUtility(configuration);
+ try {
+ hbase.startMiniCluster();
+ } catch (Exception e) {
+ LOGGER.error("Error to start hbase mini cluster: " + e.getMessage(), e);
+ throw new IllegalStateException(e);
+ }
+ System.setProperty("storage.hbase.autoCreateTable","false");
+ System.setProperty("storage.hbase.zookeeperZnodeParent", getZkZnodeParent());
+ System.setProperty("storage.hbase.zookeeperPropertyClientPort", String.valueOf(hbase.getZkCluster().getClientPort()));
}
@AfterClass
public static void shutdownHBase() {
- if (hbase != null) {
- hbase.shutdown();
+ try {
+ hbase.shutdownMiniCluster();
+ } catch (Exception e) {
+ LOGGER.error("Error to shutdown mini hbase cluster: " + e.getMessage(),e);
+ } finally {
+ hbase = null;
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/service/ApplicationStatusUpdateService.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/service/ApplicationStatusUpdateService.java b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/service/ApplicationStatusUpdateService.java
index 66772ac..d725614 100644
--- a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/service/ApplicationStatusUpdateService.java
+++ b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/service/ApplicationStatusUpdateService.java
@@ -22,7 +22,5 @@ import org.apache.eagle.metadata.model.ApplicationEntity;
import java.util.Collection;
public abstract class ApplicationStatusUpdateService extends AbstractScheduledService {
- public abstract void updateApplicationEntityStatus(Collection<ApplicationEntity> applicationEntities);
-
public abstract void updateApplicationEntityStatus(ApplicationEntity applicationEntity);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/impl/EagleServiceClientImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/impl/EagleServiceClientImpl.java b/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/impl/EagleServiceClientImpl.java
index 0411b90..7c79b39 100644
--- a/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/impl/EagleServiceClientImpl.java
+++ b/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/impl/EagleServiceClientImpl.java
@@ -19,6 +19,7 @@ package org.apache.eagle.service.client.impl;
import com.sun.jersey.api.client.WebResource;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigException;
+import org.apache.commons.lang3.StringUtils;
import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
import org.apache.eagle.service.client.EagleServiceClientException;
@@ -65,11 +66,13 @@ public class EagleServiceClientImpl extends EagleServiceBaseClient {
try {
return config.getInt(SERVICE_PORT_KEY);
} catch (ConfigException.WrongType wrongType) {
- return Integer.valueOf(config.getString(SERVICE_PORT_KEY));
+ String portStr = config.getString(SERVICE_PORT_KEY);
+ if (StringUtils.isNotBlank(portStr)) {
+ return Integer.valueOf(portStr);
+ }
}
- } else {
- return 9090;
}
+ return 9090;
}
public EagleServiceClientImpl(String host, int port, String username, String password) {
http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-query/eagle-client-base/src/test/java/org/apache/eagle/service/client/ClientTestBase.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-client-base/src/test/java/org/apache/eagle/service/client/ClientTestBase.java b/eagle-core/eagle-query/eagle-client-base/src/test/java/org/apache/eagle/service/client/ClientTestBase.java
index ac16b93..adfd2e2 100644
--- a/eagle-core/eagle-query/eagle-client-base/src/test/java/org/apache/eagle/service/client/ClientTestBase.java
+++ b/eagle-core/eagle-query/eagle-client-base/src/test/java/org/apache/eagle/service/client/ClientTestBase.java
@@ -19,7 +19,7 @@ package org.apache.eagle.service.client;
import org.apache.eagle.service.hbase.EmbeddedHbase;
public class ClientTestBase {
-
+
//protected static EmbeddedServer server;
protected static EmbeddedHbase hbase;
http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-query/eagle-entity-base/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/pom.xml b/eagle-core/eagle-query/eagle-entity-base/pom.xml
index f887714..fd2300c 100755
--- a/eagle-core/eagle-query/eagle-entity-base/pom.xml
+++ b/eagle-core/eagle-query/eagle-entity-base/pom.xml
@@ -31,20 +31,20 @@
<dependencies>
<!-- put extcos dependency at the top for using asm 4.0 jar !-->
- <dependency>
- <groupId>net.sf.extcos</groupId>
- <artifactId>extcos</artifactId>
- <exclusions>
- <exclusion>
- <groupId>org.ow2.asm</groupId>
- <artifactId>asm-all</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.ow2.asm</groupId>
- <artifactId>asm-all</artifactId>
- </dependency>
+ <!--<dependency>-->
+ <!--<groupId>net.sf.extcos</groupId>-->
+ <!--<artifactId>extcos</artifactId>-->
+ <!--<exclusions>-->
+ <!--<exclusion>-->
+ <!--<groupId>org.ow2.asm</groupId>-->
+ <!--<artifactId>asm-all</artifactId>-->
+ <!--</exclusion>-->
+ <!--</exclusions>-->
+ <!--</dependency>-->
+ <!--<dependency>-->
+ <!--<groupId>org.ow2.asm</groupId>-->
+ <!--<artifactId>asm-all</artifactId>-->
+ <!--</dependency>-->
<dependency>
<groupId>org.apache.eagle</groupId>
<artifactId>eagle-common</artifactId>
http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/repo/EntityRepositoryScanner.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/repo/EntityRepositoryScanner.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/repo/EntityRepositoryScanner.java
index 7065cbe..8ccee87 100644
--- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/repo/EntityRepositoryScanner.java
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/repo/EntityRepositoryScanner.java
@@ -16,55 +16,67 @@
*/
package org.apache.eagle.log.entity.repo;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Set;
-
+import org.apache.commons.lang3.time.StopWatch;
+import org.apache.eagle.common.utils.ReflectionsHelper;
import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
-import org.apache.eagle.log.entity.meta.EntitySerDeser;
-import net.sf.extcos.ComponentQuery;
-import net.sf.extcos.ComponentScanner;
-
import org.apache.eagle.log.entity.meta.EntityDefinitionManager;
+import org.apache.eagle.log.entity.meta.EntitySerDeser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Collection;
+import java.util.Map;
+
public final class EntityRepositoryScanner {
- private static final Logger LOG = LoggerFactory.getLogger(EntityRepositoryScanner.class);
+ private static final Logger LOG = LoggerFactory.getLogger(EntityRepositoryScanner.class);
- public static void scan() throws InstantiationException, IllegalAccessException {
- // TODO currently extcos 0.3b doesn't support to search packages like "com.*.eagle.*", "org.*.eagle.*". However 0.4b depends on asm-all version 4.0, which is
- // conflicted with jersey server 1.8. We should fix it later
- LOG.info("Scanning all entity repositories with pattern \"org.apache.eagle.*\"");
- final ComponentScanner scanner = new ComponentScanner();
- final Set<Class<?>> classes = scanner.getClasses(new EntityRepoScanQuery() );
- for (Class<?> entityClass : classes) {
- LOG.info("Processing entity repository: " + entityClass.getName());
- if (EntityRepository.class.isAssignableFrom(entityClass)) {
- EntityRepository repo = (EntityRepository)entityClass.newInstance();
- addRepo(repo);
- }
- }
- }
+ // public static void scan() throws InstantiationException, IllegalAccessException {
+ // // TODO currently extcos 0.3b doesn't support to search packages like "com.*.eagle.*", "org.*.eagle.*". However 0.4b depends on asm-all version 4.0, which is
+ // // conflicted with jersey server 1.8. We should fix it later
+ // LOG.info("Scanning all entity repositories with pattern \"org.apache.eagle.*\"");
+ // final ComponentScanner scanner = new ComponentScanner();
+ // final Set<Class<?>> classes = scanner.getClasses(new EntityRepoScanQuery() );
+ // for (Class<?> entityClass : classes) {
+ // LOG.info("Processing entity repository: " + entityClass.getName());
+ // if (EntityRepository.class.isAssignableFrom(entityClass)) {
+ // EntityRepository repo = (EntityRepository)entityClass.newInstance();
+ // addRepo(repo);
+ // }
+ // }
+ // }
- private static void addRepo(EntityRepository repo) {
- final Map<Class<?>, EntitySerDeser<?>> serDeserMap = repo.getSerDeserMap();
- for (Map.Entry<Class<?>, EntitySerDeser<?>> entry : serDeserMap.entrySet()) {
- EntityDefinitionManager.registerSerDeser(entry.getKey(), entry.getValue());
- }
- final Collection<Class<? extends TaggedLogAPIEntity>> entityClasses = repo.getEntitySet();
- for (Class<? extends TaggedLogAPIEntity> clazz : entityClasses) {
- EntityDefinitionManager.registerEntity(clazz);
- }
- }
+ public static void scan() throws IllegalAccessException, InstantiationException {
+ LOG.info("Scanning all entity repositories");
+ StopWatch stopWatch = new StopWatch();
+ stopWatch.start();
+ for (Class<? extends EntityRepository> entityRepoClass : ReflectionsHelper.getInstance().getSubTypesOf(EntityRepository.class)) {
+ if (EntityRepository.class.isAssignableFrom(entityRepoClass)) {
+ EntityRepository repo = entityRepoClass.newInstance();
+ addRepo(repo);
+ }
+ }
+ stopWatch.stop();
+ LOG.info("Finished scanning entity repositories in {} ms", stopWatch.getTime());
+ }
- public static class EntityRepoScanQuery extends ComponentQuery {
+ private static void addRepo(EntityRepository repo) {
+ final Map<Class<?>, EntitySerDeser<?>> serDeserMap = repo.getSerDeserMap();
+ for (Map.Entry<Class<?>, EntitySerDeser<?>> entry : serDeserMap.entrySet()) {
+ EntityDefinitionManager.registerSerDeser(entry.getKey(), entry.getValue());
+ }
+ final Collection<Class<? extends TaggedLogAPIEntity>> entityClasses = repo.getEntitySet();
+ for (Class<? extends TaggedLogAPIEntity> clazz : entityClasses) {
+ EntityDefinitionManager.registerEntity(clazz);
+ }
+ }
- @Override
- protected void query() {
- select().from("org.apache.eagle").returning(
- allExtending(EntityRepository.class));
- }
- }
+ // public static class EntityRepoScanQuery extends ComponentQuery {
+ //
+ // @Override
+ // protected void query() {
+ // select().from("org.apache.eagle").returning(
+ // allExtending(EntityRepository.class));
+ // }
+ // }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/49ca3b0e/eagle-core/eagle-query/eagle-entity-base/src/test/java/org/apache/eagle/log/entity/TestGenericEntityIndexStreamReader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/test/java/org/apache/eagle/log/entity/TestGenericEntityIndexStreamReader.java b/eagle-core/eagle-query/eagle-entity-base/src/test/java/org/apache/eagle/log/entity/TestGenericEntityIndexStreamReader.java
index 1e9e6cb..33aee32 100755
--- a/eagle-core/eagle-query/eagle-entity-base/src/test/java/org/apache/eagle/log/entity/TestGenericEntityIndexStreamReader.java
+++ b/eagle-core/eagle-query/eagle-entity-base/src/test/java/org/apache/eagle/log/entity/TestGenericEntityIndexStreamReader.java
@@ -26,19 +26,24 @@ import org.apache.eagle.log.entity.test.TestLogAPIEntity;
import org.apache.eagle.query.parser.EagleQueryParser;
import org.apache.eagle.service.hbase.TestHBaseBase;
import org.junit.Assert;
+import org.junit.BeforeClass;
import org.junit.Test;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
public class TestGenericEntityIndexStreamReader extends TestHBaseBase {
- @Test
- public void testUniqueIndexRead() throws Exception {
+ @BeforeClass
+ public static void createTable() throws IOException, IllegalAccessException, InstantiationException {
EntityDefinition entityDefinition = EntityDefinitionManager.getEntityDefinitionByEntityClass(TestLogAPIEntity.class);
hbase.createTable(entityDefinition.getTable(), entityDefinition.getColumnFamily());
+ }
+ @Test
+ public void testUniqueIndexRead() throws Exception {
EntityDefinitionManager.registerEntity(TestLogAPIEntity.class);
final EntityDefinition ed = EntityDefinitionManager.getEntityDefinitionByEntityClass(TestLogAPIEntity.class);
@@ -95,7 +100,6 @@ public class TestGenericEntityIndexStreamReader extends TestHBaseBase {
indexReader = new UniqueIndexStreamReader(indexDef, condition);
batchReader = new GenericEntityBatchReader(indexReader);
entities = batchReader.read();
- hbase.deleteTable(entityDefinition.getTable());
Assert.assertNotNull(entities);
Assert.assertTrue(entities.isEmpty());
}
@@ -103,7 +107,7 @@ public class TestGenericEntityIndexStreamReader extends TestHBaseBase {
@Test
public void testNonClusterIndexRead() throws Exception {
EntityDefinition entityDefinition = EntityDefinitionManager.getEntityDefinitionByEntityClass(TestLogAPIEntity.class);
- hbase.createTable(entityDefinition.getTable(), entityDefinition.getColumnFamily());
+ // hbase.createTable(entityDefinition.getTable(), entityDefinition.getColumnFamily());
EntityDefinitionManager.registerEntity(TestLogAPIEntity.class);
final EntityDefinition ed = EntityDefinitionManager.getEntityDefinitionByEntityClass(TestLogAPIEntity.class);
@@ -162,7 +166,7 @@ public class TestGenericEntityIndexStreamReader extends TestHBaseBase {
indexReader = new NonClusteredIndexStreamReader(indexDef, condition);
batchReader = new GenericEntityBatchReader(indexReader);
entities = batchReader.read();
- hbase.deleteTable(entityDefinition.getTable());
+ // hbase.deleteTable(entityDefinition.getTable());
Assert.assertNotNull(entities);
Assert.assertTrue(entities.isEmpty());
}