You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ni...@apache.org on 2018/08/23 21:58:55 UTC

[3/4] metron git commit: METRON-1707 Port Profiler to Spark (nickwallen) closes apache/metron#1150

http://git-wip-us.apache.org/repos/asf/metron/blob/3bfbf018/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/MessageRouterFunctionTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/MessageRouterFunctionTest.java b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/MessageRouterFunctionTest.java
new file mode 100644
index 0000000..ceaa7cd
--- /dev/null
+++ b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/MessageRouterFunctionTest.java
@@ -0,0 +1,114 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.metron.profiler.spark.function;
+
+import com.google.common.collect.Lists;
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.metron.common.configuration.profiler.ProfileConfig;
+import org.apache.metron.common.configuration.profiler.ProfilerConfig;
+import org.apache.metron.profiler.MessageRoute;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Tests the {@link MessageRouterFunction}.
+ */
+public class MessageRouterFunctionTest {
+
+  /**
+   * { "ip_src_addr": "192.168.1.22" }
+   */
+  @Multiline
+  private String goodMessage;
+
+  /**
+   * { "ip_src_addr": "192.168.1.22"
+   */
+  private String badMessage;
+
+  @Test
+  public void testFindRoutes() throws Exception {
+    MessageRouterFunction function = new MessageRouterFunction(oneProfile(), getGlobals());
+    Iterator<MessageRoute> iter = function.call(goodMessage);
+
+    List<MessageRoute> routes = Lists.newArrayList(iter);
+    Assert.assertEquals(1, routes.size());
+    Assert.assertEquals("profile1", routes.get(0).getProfileDefinition().getProfile());
+  }
+
+  /**
+   * A bad or invalid message should return no routes.
+   */
+  @Test
+  public void testWithBadMessage() throws Exception {
+    MessageRouterFunction function = new MessageRouterFunction(oneProfile(), getGlobals());
+    Iterator<MessageRoute> iter = function.call(badMessage);
+
+    List<MessageRoute> routes = Lists.newArrayList(iter);
+    Assert.assertEquals(0, routes.size());
+  }
+
+  @Test
+  public void testFindMultipleRoutes() throws Exception {
+    MessageRouterFunction function = new MessageRouterFunction(twoProfiles(), getGlobals());
+    Iterator<MessageRoute> iter = function.call(goodMessage);
+
+    List<MessageRoute> routes = Lists.newArrayList(iter);
+    Assert.assertEquals(2, routes.size());
+    Assert.assertEquals("profile1", routes.get(0).getProfileDefinition().getProfile());
+    Assert.assertEquals("profile2", routes.get(1).getProfileDefinition().getProfile());
+  }
+
+  private ProfilerConfig oneProfile() {
+    ProfileConfig profile = new ProfileConfig()
+            .withProfile("profile1")
+            .withForeach("ip_src_addr")
+            .withUpdate("count", "count + 1")
+            .withResult("count");
+
+    return new ProfilerConfig()
+            .withProfile(profile);
+  }
+
+  private ProfilerConfig twoProfiles() {
+    ProfileConfig profile1 = new ProfileConfig()
+            .withProfile("profile1")
+            .withForeach("ip_src_addr")
+            .withUpdate("count", "count + 1")
+            .withResult("count");
+    ProfileConfig profile2 = new ProfileConfig()
+            .withProfile("profile2")
+            .withForeach("ip_src_addr")
+            .withUpdate("count", "count + 1")
+            .withResult("count");
+    return new ProfilerConfig()
+            .withProfile(profile1)
+            .withProfile(profile2);
+  }
+
+  private Map<String, String> getGlobals() {
+    return Collections.emptyMap();
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/3bfbf018/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/ProfileBuilderFunctionTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/ProfileBuilderFunctionTest.java b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/ProfileBuilderFunctionTest.java
new file mode 100644
index 0000000..d5a4dba
--- /dev/null
+++ b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/ProfileBuilderFunctionTest.java
@@ -0,0 +1,98 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.metron.profiler.spark.function;
+
+import org.apache.metron.common.configuration.profiler.ProfileConfig;
+import org.apache.metron.profiler.MessageRoute;
+import org.apache.metron.profiler.ProfilePeriod;
+import org.apache.metron.profiler.spark.ProfileMeasurementAdapter;
+import org.json.simple.JSONObject;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.metron.profiler.spark.BatchProfilerConfig.PERIOD_DURATION;
+import static org.apache.metron.profiler.spark.BatchProfilerConfig.PERIOD_DURATION_UNITS;
+
+public class ProfileBuilderFunctionTest {
+
+  @Test
+  public void testBuildProfile() throws Exception {
+    // setup the message and profile
+    JSONObject message = getMessage();
+    String entity = "192.168.1.1";
+    long timestamp = (Long) message.get("timestamp");
+    ProfileConfig profile = getProfile();
+
+    // setup the route
+    MessageRoute route = new MessageRoute(profile, entity, message, timestamp);
+    List<MessageRoute> routes = new ArrayList();
+    routes.add(route);
+    routes.add(route);
+    routes.add(route);
+    Properties profilerProperties = getProfilerProperties();
+
+    // setup the period
+    int periodDuration = PERIOD_DURATION.get(profilerProperties, Integer.class);
+    TimeUnit periodDurationUnits = TimeUnit.valueOf(PERIOD_DURATION_UNITS.get(profilerProperties, String.class));
+    ProfilePeriod expectedPeriod = ProfilePeriod.fromTimestamp(timestamp, periodDuration, periodDurationUnits);
+
+    // build the profile
+    ProfileBuilderFunction function = new ProfileBuilderFunction(profilerProperties, getGlobals());
+    ProfileMeasurementAdapter measurement = function.call("profile1-192.168.1.1-0", routes.iterator());
+
+    // validate the measurement
+    Assert.assertEquals(entity, measurement.getEntity());
+    Assert.assertEquals(profile.getProfile(), measurement.getProfileName());
+    Assert.assertEquals(routes.size(), measurement.toProfileMeasurement().getProfileValue());
+    Assert.assertEquals(expectedPeriod.getPeriod(), (long) measurement.getPeriodId());
+  }
+
+  private JSONObject getMessage() {
+    JSONObject message = new JSONObject();
+    message.put("ip_src_addr", "192.168.1.1");
+    message.put("status", "red");
+    message.put("timestamp", System.currentTimeMillis());
+    return message;
+  }
+
+  private Properties getProfilerProperties() {
+    return new Properties();
+  }
+
+  private Map<String, String> getGlobals() {
+    return Collections.emptyMap();
+  }
+
+  private ProfileConfig getProfile() {
+    return new ProfileConfig()
+            .withProfile("profile1")
+            .withForeach("ip_src_addr")
+            .withUpdate("count", "count + 1")
+            .withResult("count");
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/3bfbf018/metron-analytics/metron-profiler-spark/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-spark/src/test/resources/log4j.properties b/metron-analytics/metron-profiler-spark/src/test/resources/log4j.properties
new file mode 100644
index 0000000..c3db0af
--- /dev/null
+++ b/metron-analytics/metron-profiler-spark/src/test/resources/log4j.properties
@@ -0,0 +1,31 @@
+#
+#
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+#
+
+# Root logger option
+log4j.rootLogger=ERROR, stdout
+
+# Direct log messages to stdout
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.Target=System.out
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
+
+log4j.logger.org.apache.metron.profiler=ERROR
+log4j.logger.org.apache.metron.profiler.spark=ERROR
\ No newline at end of file