You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by qi...@apache.org on 2017/08/07 02:56:59 UTC
eagle git commit: [EAGLE-1059] fix a bug in PolicyResource.java
Repository: eagle
Updated Branches:
refs/heads/master f20bf0eb3 -> ba79b5485
[EAGLE-1059] fix a bug in PolicyResource.java
https://issues.apache.org/jira/browse/EAGLE-1059
Author: Zhao, Qingwen <qi...@apache.org>
Closes #965 from qingwen220/EAGLE-1059.
Project: http://git-wip-us.apache.org/repos/asf/eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/ba79b548
Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/ba79b548
Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/ba79b548
Branch: refs/heads/master
Commit: ba79b5485622b9de92373d612631d1b96a245287
Parents: f20bf0e
Author: Zhao, Qingwen <qi...@apache.org>
Authored: Mon Aug 7 10:56:55 2017 +0800
Committer: Zhao, Qingwen <qi...@apache.org>
Committed: Mon Aug 7 10:56:55 2017 +0800
----------------------------------------------------------------------
.../impl/AlertBoltOutputCollectorWrapper.java | 38 ++++++++------
.../eagle/metadata/resource/PolicyResource.java | 24 +++++++++
.../metadata/utils/PolicyIdConversions.java | 20 ++++---
.../dev/partials/alert/policyPrototypes.html | 55 ++++++++++----------
.../app/dev/public/js/ctrls/alertEditCtrl.js | 37 +++++++------
5 files changed, 103 insertions(+), 71 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/eagle/blob/ba79b548/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java
index 606ddce..4c749f4 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java
@@ -49,28 +49,32 @@ public class AlertBoltOutputCollectorWrapper implements AlertStreamCollector {
@Override
public void emit(AlertStreamEvent event) {
+ if (event == null) {
+ return;
+ }
+ event.ensureAlertId();
Set<PublishPartition> clonedPublishPartitions = new HashSet<>(publishPartitions);
for (PublishPartition publishPartition : clonedPublishPartitions) {
// skip the publish partition which is not belong to this policy and also check streamId
PublishPartition cloned = publishPartition.clone();
Optional.ofNullable(event)
- .filter(x -> x != null
- && x.getSchema() != null
- && cloned.getPolicyId().equalsIgnoreCase(x.getPolicyId())
- && (cloned.getStreamId().equalsIgnoreCase(x.getSchema().getStreamId())
- || cloned.getStreamId().equalsIgnoreCase(Publishment.STREAM_NAME_DEFAULT)))
- .ifPresent(x -> {
- cloned.getColumns().stream()
- .filter(y -> event.getSchema().getColumnIndex(y) >= 0
- && event.getSchema().getColumnIndex(y) < event.getSchema().getColumns().size())
- .map(y -> event.getData()[event.getSchema().getColumnIndex(y)])
- .filter(y -> y != null)
- .forEach(y -> cloned.getColumnValues().add(y));
- synchronized (outputLock) {
- streamContext.counter().incr("alert_count");
- delegate.emit(Arrays.asList(cloned, event));
- }
- });
+ .filter(x -> x != null
+ && x.getSchema() != null
+ && cloned.getPolicyId().equalsIgnoreCase(x.getPolicyId())
+ && (cloned.getStreamId().equalsIgnoreCase(x.getSchema().getStreamId())
+ || cloned.getStreamId().equalsIgnoreCase(Publishment.STREAM_NAME_DEFAULT)))
+ .ifPresent(x -> {
+ cloned.getColumns().stream()
+ .filter(y -> event.getSchema().getColumnIndex(y) >= 0
+ && event.getSchema().getColumnIndex(y) < event.getSchema().getColumns().size())
+ .map(y -> event.getData()[event.getSchema().getColumnIndex(y)])
+ .filter(y -> y != null)
+ .forEach(y -> cloned.getColumnValues().add(y));
+ synchronized (outputLock) {
+ streamContext.counter().incr("alert_count");
+ delegate.emit(Arrays.asList(cloned, event));
+ }
+ });
}
}
http://git-wip-us.apache.org/repos/asf/eagle/blob/ba79b548/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/resource/PolicyResource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/resource/PolicyResource.java b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/resource/PolicyResource.java
index d09da4b..9edfc3e 100644
--- a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/resource/PolicyResource.java
+++ b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/resource/PolicyResource.java
@@ -21,6 +21,7 @@ import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
import org.apache.eagle.alert.engine.coordinator.Publishment;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
import org.apache.eagle.alert.engine.interpreter.PolicyValidationResult;
import org.apache.eagle.alert.metadata.resource.OpResult;
import org.apache.eagle.common.rest.RESTResponse;
@@ -72,6 +73,7 @@ public class PolicyResource {
Preconditions.checkNotNull(policyEntity.getAlertPublishmentIds(), "alert publisher list should not be null");
PolicyDefinition policyDefinition = policyEntity.getDefinition();
+ checkOutputStream(policyDefinition.getInputStreams(), policyDefinition.getOutputStreams());
OpResult result = metadataResource.addPolicy(policyDefinition);
if (result.code != 200) {
throw new IllegalArgumentException(result.message);
@@ -153,6 +155,7 @@ public class PolicyResource {
private PolicyEntity importPolicyProto(PolicyEntity policyEntity) {
PolicyDefinition policyDefinition = policyEntity.getDefinition();
+ checkOutputStream(policyDefinition.getInputStreams(), policyDefinition.getOutputStreams());
List<String> inputStreamType = new ArrayList<>();
String newDefinition = policyDefinition.getDefinition().getValue();
for (String inputStream : policyDefinition.getInputStreams()) {
@@ -164,6 +167,7 @@ public class PolicyResource {
policyDefinition.getDefinition().setValue(newDefinition);
policyDefinition.setName(PolicyIdConversions.parsePolicyId(policyDefinition.getSiteId(), policyDefinition.getName()));
policyDefinition.setSiteId(null);
+ policyDefinition.getPartitionSpec().clear();
policyEntity.setDefinition(policyDefinition);
return policyEntityService.createOrUpdatePolicyProto(policyEntity);
}
@@ -186,10 +190,19 @@ public class PolicyResource {
policyDefinition.getDefinition().setValue(newDefinition);
policyDefinition.setSiteId(site);
policyDefinition.setName(PolicyIdConversions.generateUniquePolicyId(site, policyProto.getDefinition().getName()));
+
PolicyValidationResult validationResult = metadataResource.validatePolicy(policyDefinition);
if (!validationResult.isSuccess() || validationResult.getException() != null) {
throw new IllegalArgumentException(validationResult.getException());
}
+
+ policyDefinition.getPartitionSpec().clear();
+ for (StreamPartition sd : validationResult.getPolicyExecutionPlan().getStreamPartitions()) {
+ if (inputStreams.contains(sd.getStreamId())) {
+ policyDefinition.getPartitionSpec().add(sd);
+ }
+ }
+
OpResult result = metadataResource.addPolicy(policyDefinition);
if (result.code != 200) {
throw new IllegalArgumentException("fail to create policy: " + result.message);
@@ -204,4 +217,15 @@ public class PolicyResource {
return true;
}
+ private void checkOutputStream(List<String> inputStreams, List<String> outputStreams) {
+ for (String inputStream : inputStreams) {
+ for (String outputStream : outputStreams) {
+ if (outputStream.contains(inputStream)) {
+ throw new IllegalArgumentException("OutputStream name should not contains string: " + inputStream
+ + ". Please rename your OutputStream name");
+ }
+ }
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/eagle/blob/ba79b548/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/utils/PolicyIdConversions.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/utils/PolicyIdConversions.java b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/utils/PolicyIdConversions.java
index c9ccadc..2012a40 100644
--- a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/utils/PolicyIdConversions.java
+++ b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/utils/PolicyIdConversions.java
@@ -22,17 +22,21 @@ import com.google.common.base.Preconditions;
public class PolicyIdConversions {
public static String generateUniquePolicyId(String siteId, String policyName) {
- return String.format("%s_%s", policyName, siteId);
+ String subffix = String.format("_%s", siteId.toLowerCase());
+ if (policyName.toLowerCase().endsWith(subffix)) {
+ return policyName;
+ }
+ return String.format("%s_%s", policyName, siteId.toLowerCase());
}
- public static String parsePolicyId(String siteId, String generatedUniquePolicyId) {
- String subffix = String.format("_%s", siteId);
- if (generatedUniquePolicyId.endsWith(subffix)) {
- int streamTypeIdLength = generatedUniquePolicyId.length() - subffix.length();
- Preconditions.checkArgument(streamTypeIdLength > 0, "Invalid policyId: " + generatedUniquePolicyId + ", policyId is empty");
- return generatedUniquePolicyId.substring(0, streamTypeIdLength);
+ public static String parsePolicyId(String siteId, String policyName) {
+ String subffix = String.format("_%s", siteId.toLowerCase());
+ if (policyName.toLowerCase().endsWith(subffix)) {
+ int streamTypeIdLength = policyName.length() - subffix.length();
+ Preconditions.checkArgument(streamTypeIdLength > 0, "Invalid policyId: " + policyName + ", policyId is empty");
+ return policyName.substring(0, streamTypeIdLength);
} else {
- return generatedUniquePolicyId;
+ return policyName;
}
}
}
http://git-wip-us.apache.org/repos/asf/eagle/blob/ba79b548/eagle-server/src/main/webapp/app/dev/partials/alert/policyPrototypes.html
----------------------------------------------------------------------
diff --git a/eagle-server/src/main/webapp/app/dev/partials/alert/policyPrototypes.html b/eagle-server/src/main/webapp/app/dev/partials/alert/policyPrototypes.html
index 9f22ce4..0798475 100644
--- a/eagle-server/src/main/webapp/app/dev/partials/alert/policyPrototypes.html
+++ b/eagle-server/src/main/webapp/app/dev/partials/alert/policyPrototypes.html
@@ -27,35 +27,36 @@
<div sort-table="prototypeList">
<table class="table table-bordered table-hover">
<thead>
- <tr>
- <th>
- <input type="checkbox" ng-checked="getCheckedList().length === prototypeList.length && prototypeList.length !== 0" ng-click="doCheckAll()" />
- </th>
- <th>Name</th>
- <th>Definition</th>
- <th>Publishers</th>
- <th width="110">Operation</th>
- </tr>
+ <tr>
+ <th>
+ <input type="checkbox" ng-checked="getCheckedList().length === prototypeList.length && prototypeList.length !== 0" ng-click="doCheckAll()" />
+ </th>
+ <th>Name</th>
+ <th>Definition</th>
+ <!--<th>Publishers</th> -->
+ <th width="110">Operation</th>
+ </tr>
</thead>
<tbody>
- <tr>
- <td>
- <input type="checkbox" ng-checked="checkedPrototypes[item.name]" ng-click="checkedPrototypes[item.name] = !checkedPrototypes[item.name]" />
- </td>
- <td>{{item.name}}</td>
- <td><pre>{{item.definition.definition.value}}</pre></td>
- <td>
- <ul class="no-margin">
- <li ng-repeat="publisher in item.alertPublishmentIds track by $index">
- {{publisher}}
- </li>
- </ul>
- </td>
- <td class="text-center">
- <button class="btn btn-xs btn-primary" ng-click="createPolicy([item])">Export</button>
- <button class="btn btn-xs btn-danger" ng-click="deletePrototype(item)">Delete</button>
- </td>
- </tr>
+ <tr>
+ <td>
+ <input type="checkbox" ng-checked="checkedPrototypes[item.name]" ng-click="checkedPrototypes[item.name] = !checkedPrototypes[item.name]" />
+ </td>
+ <td>{{item.name}}</td>
+ <td><pre>{{item.definition.definition.value}}</pre></td>
+ <!--
+ <td>
+ <ul class="no-margin">
+ <li ng-repeat="publisher in item.alertPublishmentIds track by $index">
+ {{publisher}}
+ </li>
+ </ul>
+ </td> -->
+ <td class="text-center">
+ <button class="btn btn-xs btn-primary" ng-click="createPolicy([item])">Export</button>
+ <button class="btn btn-xs btn-danger" ng-click="deletePrototype(item)">Delete</button>
+ </td>
+ </tr>
</tbody>
</table>
</div>
http://git-wip-us.apache.org/repos/asf/eagle/blob/ba79b548/eagle-server/src/main/webapp/app/dev/public/js/ctrls/alertEditCtrl.js
----------------------------------------------------------------------
diff --git a/eagle-server/src/main/webapp/app/dev/public/js/ctrls/alertEditCtrl.js b/eagle-server/src/main/webapp/app/dev/public/js/ctrls/alertEditCtrl.js
index e4623b2..fc2b494 100644
--- a/eagle-server/src/main/webapp/app/dev/public/js/ctrls/alertEditCtrl.js
+++ b/eagle-server/src/main/webapp/app/dev/public/js/ctrls/alertEditCtrl.js
@@ -253,12 +253,12 @@
};
/*$scope.checkInputStream = function (streamId) {
- if($scope.isInputStreamSelected(streamId)) {
- $scope.policy.inputStreams = common.array.remove(streamId, $scope.policy.inputStreams);
- } else {
- $scope.policy.inputStreams.push(streamId);
- }
- };*/
+ if($scope.isInputStreamSelected(streamId)) {
+ $scope.policy.inputStreams = common.array.remove(streamId, $scope.policy.inputStreams);
+ } else {
+ $scope.policy.inputStreams.push(streamId);
+ }
+ };*/
// ==============================================================
// = Definition =
@@ -558,7 +558,18 @@
});
}
- policyPromise._then(function () {
+ policyPromise._then(function (res) {
+ var validate = res.data;
+ if (!validate.success) {
+ $.dialog({
+ title: "OPS",
+ content: "Create policy failed: " + (res.data.message || res.data.errors)
+ });
+ $scope.policyLock = false;
+ $scope.saveLock = false;
+ return;
+ }
+
console.log("Create policy success...");
$.dialog({
title: "Done",
@@ -566,18 +577,6 @@
}, function () {
$wrapState.go("policyDetail", {name: $scope.policy.name, siteId: $scope.policy.siteId});
});
- }, function (res) {
- var errormsg = "";
- if(typeof res.data.message !== 'undefined') {
- errormsg = res.data.message;
- } else {
- errormsg = res.data.errors;
- }
- $.dialog({
- title: "OPS",
- content: "Create policy failed: " + errormsg
- });
- $scope.policyLock = false;
});
}, function (args) {
$.dialog({