You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by qi...@apache.org on 2017/08/07 02:56:59 UTC

eagle git commit: [EAGLE-1059] fix a bug in PolicyResource.java

Repository: eagle
Updated Branches:
  refs/heads/master f20bf0eb3 -> ba79b5485


[EAGLE-1059] fix a bug in PolicyResource.java

https://issues.apache.org/jira/browse/EAGLE-1059

Author: Zhao, Qingwen <qi...@apache.org>

Closes #965 from qingwen220/EAGLE-1059.


Project: http://git-wip-us.apache.org/repos/asf/eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/ba79b548
Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/ba79b548
Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/ba79b548

Branch: refs/heads/master
Commit: ba79b5485622b9de92373d612631d1b96a245287
Parents: f20bf0e
Author: Zhao, Qingwen <qi...@apache.org>
Authored: Mon Aug 7 10:56:55 2017 +0800
Committer: Zhao, Qingwen <qi...@apache.org>
Committed: Mon Aug 7 10:56:55 2017 +0800

----------------------------------------------------------------------
 .../impl/AlertBoltOutputCollectorWrapper.java   | 38 ++++++++------
 .../eagle/metadata/resource/PolicyResource.java | 24 +++++++++
 .../metadata/utils/PolicyIdConversions.java     | 20 ++++---
 .../dev/partials/alert/policyPrototypes.html    | 55 ++++++++++----------
 .../app/dev/public/js/ctrls/alertEditCtrl.js    | 37 +++++++------
 5 files changed, 103 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/eagle/blob/ba79b548/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java
index 606ddce..4c749f4 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java
@@ -49,28 +49,32 @@ public class AlertBoltOutputCollectorWrapper implements AlertStreamCollector {
 
     @Override
     public void emit(AlertStreamEvent event) {
+        if (event == null) {
+            return;
+        }
+        event.ensureAlertId();
         Set<PublishPartition> clonedPublishPartitions = new HashSet<>(publishPartitions);
         for (PublishPartition publishPartition : clonedPublishPartitions) {
             // skip the publish partition which is not belong to this policy and also check streamId
             PublishPartition cloned = publishPartition.clone();
             Optional.ofNullable(event)
-                .filter(x -> x != null
-                    && x.getSchema() != null
-                    && cloned.getPolicyId().equalsIgnoreCase(x.getPolicyId())
-                    && (cloned.getStreamId().equalsIgnoreCase(x.getSchema().getStreamId())
-                    || cloned.getStreamId().equalsIgnoreCase(Publishment.STREAM_NAME_DEFAULT)))
-                .ifPresent(x -> {
-                    cloned.getColumns().stream()
-                        .filter(y -> event.getSchema().getColumnIndex(y) >= 0
-                            && event.getSchema().getColumnIndex(y) < event.getSchema().getColumns().size())
-                        .map(y -> event.getData()[event.getSchema().getColumnIndex(y)])
-                        .filter(y -> y != null)
-                        .forEach(y -> cloned.getColumnValues().add(y));
-                    synchronized (outputLock) {
-                        streamContext.counter().incr("alert_count");
-                        delegate.emit(Arrays.asList(cloned, event));
-                    }
-                });
+                    .filter(x -> x != null
+                            && x.getSchema() != null
+                            && cloned.getPolicyId().equalsIgnoreCase(x.getPolicyId())
+                            && (cloned.getStreamId().equalsIgnoreCase(x.getSchema().getStreamId())
+                            || cloned.getStreamId().equalsIgnoreCase(Publishment.STREAM_NAME_DEFAULT)))
+                    .ifPresent(x -> {
+                        cloned.getColumns().stream()
+                                .filter(y -> event.getSchema().getColumnIndex(y) >= 0
+                                        && event.getSchema().getColumnIndex(y) < event.getSchema().getColumns().size())
+                                .map(y -> event.getData()[event.getSchema().getColumnIndex(y)])
+                                .filter(y -> y != null)
+                                .forEach(y -> cloned.getColumnValues().add(y));
+                        synchronized (outputLock) {
+                            streamContext.counter().incr("alert_count");
+                            delegate.emit(Arrays.asList(cloned, event));
+                        }
+                    });
         }
     }
 

http://git-wip-us.apache.org/repos/asf/eagle/blob/ba79b548/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/resource/PolicyResource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/resource/PolicyResource.java b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/resource/PolicyResource.java
index d09da4b..9edfc3e 100644
--- a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/resource/PolicyResource.java
+++ b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/resource/PolicyResource.java
@@ -21,6 +21,7 @@ import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
 import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
 import org.apache.eagle.alert.engine.coordinator.Publishment;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
 import org.apache.eagle.alert.engine.interpreter.PolicyValidationResult;
 import org.apache.eagle.alert.metadata.resource.OpResult;
 import org.apache.eagle.common.rest.RESTResponse;
@@ -72,6 +73,7 @@ public class PolicyResource {
             Preconditions.checkNotNull(policyEntity.getAlertPublishmentIds(), "alert publisher list should not be null");
 
             PolicyDefinition policyDefinition = policyEntity.getDefinition();
+            checkOutputStream(policyDefinition.getInputStreams(), policyDefinition.getOutputStreams());
             OpResult result = metadataResource.addPolicy(policyDefinition);
             if (result.code != 200) {
                 throw new IllegalArgumentException(result.message);
@@ -153,6 +155,7 @@ public class PolicyResource {
 
     private PolicyEntity importPolicyProto(PolicyEntity policyEntity) {
         PolicyDefinition policyDefinition = policyEntity.getDefinition();
+        checkOutputStream(policyDefinition.getInputStreams(), policyDefinition.getOutputStreams());
         List<String> inputStreamType = new ArrayList<>();
         String newDefinition = policyDefinition.getDefinition().getValue();
         for (String inputStream : policyDefinition.getInputStreams()) {
@@ -164,6 +167,7 @@ public class PolicyResource {
         policyDefinition.getDefinition().setValue(newDefinition);
         policyDefinition.setName(PolicyIdConversions.parsePolicyId(policyDefinition.getSiteId(), policyDefinition.getName()));
         policyDefinition.setSiteId(null);
+        policyDefinition.getPartitionSpec().clear();
         policyEntity.setDefinition(policyDefinition);
         return policyEntityService.createOrUpdatePolicyProto(policyEntity);
     }
@@ -186,10 +190,19 @@ public class PolicyResource {
             policyDefinition.getDefinition().setValue(newDefinition);
             policyDefinition.setSiteId(site);
             policyDefinition.setName(PolicyIdConversions.generateUniquePolicyId(site, policyProto.getDefinition().getName()));
+
             PolicyValidationResult validationResult = metadataResource.validatePolicy(policyDefinition);
             if (!validationResult.isSuccess() || validationResult.getException() != null) {
                 throw new IllegalArgumentException(validationResult.getException());
             }
+
+            policyDefinition.getPartitionSpec().clear();
+            for (StreamPartition sd : validationResult.getPolicyExecutionPlan().getStreamPartitions()) {
+                if (inputStreams.contains(sd.getStreamId())) {
+                    policyDefinition.getPartitionSpec().add(sd);
+                }
+            }
+
             OpResult result = metadataResource.addPolicy(policyDefinition);
             if (result.code != 200) {
                 throw new IllegalArgumentException("fail to create policy: " + result.message);
@@ -204,4 +217,15 @@ public class PolicyResource {
         return true;
     }
 
+    private void checkOutputStream(List<String> inputStreams, List<String> outputStreams) {
+        for (String inputStream : inputStreams) {
+            for (String outputStream : outputStreams) {
+                if (outputStream.contains(inputStream)) {
+                    throw new IllegalArgumentException("OutputStream name should not contains string: " + inputStream
+                            + ". Please rename your OutputStream name");
+                }
+            }
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/ba79b548/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/utils/PolicyIdConversions.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/utils/PolicyIdConversions.java b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/utils/PolicyIdConversions.java
index c9ccadc..2012a40 100644
--- a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/utils/PolicyIdConversions.java
+++ b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/utils/PolicyIdConversions.java
@@ -22,17 +22,21 @@ import com.google.common.base.Preconditions;
 public class PolicyIdConversions {
 
     public static String generateUniquePolicyId(String siteId, String policyName) {
-        return String.format("%s_%s", policyName, siteId);
+        String subffix = String.format("_%s", siteId.toLowerCase());
+        if (policyName.toLowerCase().endsWith(subffix)) {
+            return policyName;
+        }
+        return String.format("%s_%s", policyName, siteId.toLowerCase());
     }
 
-    public static String parsePolicyId(String siteId, String generatedUniquePolicyId) {
-        String subffix = String.format("_%s", siteId);
-        if (generatedUniquePolicyId.endsWith(subffix)) {
-            int streamTypeIdLength = generatedUniquePolicyId.length() - subffix.length();
-            Preconditions.checkArgument(streamTypeIdLength > 0, "Invalid policyId: " + generatedUniquePolicyId + ", policyId is empty");
-            return generatedUniquePolicyId.substring(0, streamTypeIdLength);
+    public static String parsePolicyId(String siteId, String policyName) {
+        String subffix = String.format("_%s", siteId.toLowerCase());
+        if (policyName.toLowerCase().endsWith(subffix)) {
+            int streamTypeIdLength = policyName.length() - subffix.length();
+            Preconditions.checkArgument(streamTypeIdLength > 0, "Invalid policyId: " + policyName + ", policyId is empty");
+            return policyName.substring(0, streamTypeIdLength);
         } else {
-            return generatedUniquePolicyId;
+            return policyName;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/ba79b548/eagle-server/src/main/webapp/app/dev/partials/alert/policyPrototypes.html
----------------------------------------------------------------------
diff --git a/eagle-server/src/main/webapp/app/dev/partials/alert/policyPrototypes.html b/eagle-server/src/main/webapp/app/dev/partials/alert/policyPrototypes.html
index 9f22ce4..0798475 100644
--- a/eagle-server/src/main/webapp/app/dev/partials/alert/policyPrototypes.html
+++ b/eagle-server/src/main/webapp/app/dev/partials/alert/policyPrototypes.html
@@ -27,35 +27,36 @@
 		<div sort-table="prototypeList">
 			<table class="table table-bordered table-hover">
 				<thead>
-					<tr>
-						<th>
-							<input type="checkbox" ng-checked="getCheckedList().length === prototypeList.length && prototypeList.length !== 0" ng-click="doCheckAll()" />
-						</th>
-						<th>Name</th>
-						<th>Definition</th>
-						<th>Publishers</th>
-						<th width="110">Operation</th>
-					</tr>
+				<tr>
+					<th>
+						<input type="checkbox" ng-checked="getCheckedList().length === prototypeList.length && prototypeList.length !== 0" ng-click="doCheckAll()" />
+					</th>
+					<th>Name</th>
+					<th>Definition</th>
+					<!--<th>Publishers</th> -->
+					<th width="110">Operation</th>
+				</tr>
 				</thead>
 				<tbody>
-					<tr>
-						<td>
-							<input type="checkbox" ng-checked="checkedPrototypes[item.name]" ng-click="checkedPrototypes[item.name] = !checkedPrototypes[item.name]" />
-						</td>
-						<td>{{item.name}}</td>
-						<td><pre>{{item.definition.definition.value}}</pre></td>
-						<td>
-							<ul class="no-margin">
-								<li ng-repeat="publisher in item.alertPublishmentIds track by $index">
-									{{publisher}}
-								</li>
-							</ul>
-						</td>
-						<td class="text-center">
-							<button class="btn btn-xs btn-primary" ng-click="createPolicy([item])">Export</button>
-							<button class="btn btn-xs btn-danger" ng-click="deletePrototype(item)">Delete</button>
-						</td>
-					</tr>
+				<tr>
+					<td>
+						<input type="checkbox" ng-checked="checkedPrototypes[item.name]" ng-click="checkedPrototypes[item.name] = !checkedPrototypes[item.name]" />
+					</td>
+					<td>{{item.name}}</td>
+					<td><pre>{{item.definition.definition.value}}</pre></td>
+					<!--
+					<td>
+						<ul class="no-margin">
+							<li ng-repeat="publisher in item.alertPublishmentIds track by $index">
+								{{publisher}}
+							</li>
+						</ul>
+					</td> -->
+					<td class="text-center">
+						<button class="btn btn-xs btn-primary" ng-click="createPolicy([item])">Export</button>
+						<button class="btn btn-xs btn-danger" ng-click="deletePrototype(item)">Delete</button>
+					</td>
+				</tr>
 				</tbody>
 			</table>
 		</div>

http://git-wip-us.apache.org/repos/asf/eagle/blob/ba79b548/eagle-server/src/main/webapp/app/dev/public/js/ctrls/alertEditCtrl.js
----------------------------------------------------------------------
diff --git a/eagle-server/src/main/webapp/app/dev/public/js/ctrls/alertEditCtrl.js b/eagle-server/src/main/webapp/app/dev/public/js/ctrls/alertEditCtrl.js
index e4623b2..fc2b494 100644
--- a/eagle-server/src/main/webapp/app/dev/public/js/ctrls/alertEditCtrl.js
+++ b/eagle-server/src/main/webapp/app/dev/public/js/ctrls/alertEditCtrl.js
@@ -253,12 +253,12 @@
 		};
 
 		/*$scope.checkInputStream = function (streamId) {
-			if($scope.isInputStreamSelected(streamId)) {
-				$scope.policy.inputStreams = common.array.remove(streamId, $scope.policy.inputStreams);
-			} else {
-				$scope.policy.inputStreams.push(streamId);
-			}
-		};*/
+		 if($scope.isInputStreamSelected(streamId)) {
+		 $scope.policy.inputStreams = common.array.remove(streamId, $scope.policy.inputStreams);
+		 } else {
+		 $scope.policy.inputStreams.push(streamId);
+		 }
+		 };*/
 
 		// ==============================================================
 		// =                         Definition                         =
@@ -558,7 +558,18 @@
 						});
 					}
 
-					policyPromise._then(function () {
+					policyPromise._then(function (res) {
+						var validate = res.data;
+						if (!validate.success) {
+							$.dialog({
+								title: "OPS",
+								content: "Create policy failed: " + (res.data.message || res.data.errors)
+							});
+							$scope.policyLock = false;
+							$scope.saveLock = false;
+							return;
+						}
+
 						console.log("Create policy success...");
 						$.dialog({
 							title: "Done",
@@ -566,18 +577,6 @@
 						}, function () {
 							$wrapState.go("policyDetail", {name: $scope.policy.name, siteId: $scope.policy.siteId});
 						});
-					}, function (res) {
-						var errormsg = "";
-						if(typeof res.data.message !== 'undefined') {
-							errormsg = res.data.message;
-						} else {
-							errormsg = res.data.errors;
-						}
-						$.dialog({
-							title: "OPS",
-							content: "Create policy failed: " + errormsg
-						});
-						$scope.policyLock = false;
 					});
 				}, function (args) {
 					$.dialog({