You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ni...@apache.org on 2016/12/05 16:37:50 UTC

incubator-metron git commit: METRON-597 Sporadic Failures of Profiler Integration Tests (nickwallen) closes apache/incubator-metron#383

Repository: incubator-metron
Updated Branches:
  refs/heads/master 2fd7f95c2 -> 354cb6ddc


METRON-597 Sporadic Failures of Profiler Integration Tests (nickwallen) closes apache/incubator-metron#383


Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/354cb6dd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/354cb6dd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/354cb6dd

Branch: refs/heads/master
Commit: 354cb6ddce7f5313fb140fd3ab8521f9dce0c35f
Parents: 2fd7f95
Author: nickwallen <ni...@nickallen.org>
Authored: Mon Dec 5 11:37:16 2016 -0500
Committer: Nick Allen <ni...@nickallen.org>
Committed: Mon Dec 5 11:37:16 2016 -0500

----------------------------------------------------------------------
 .../integration/ProfilerIntegrationTest.java    | 71 ++++++++++----------
 .../org/apache/metron/test/mock/MockHTable.java | 48 ++++++++++---
 2 files changed, 77 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/354cb6dd/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
index fc1c24f..e09f7f1 100644
--- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
+++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
@@ -21,11 +21,12 @@
 package org.apache.metron.profiler.integration;
 
 import org.adrianwalker.multilinestring.Multiline;
+import org.apache.commons.lang.math.NumberUtils;
 import org.apache.commons.math.util.MathUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.spout.kafka.SpoutConfig;
@@ -109,6 +110,7 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
 
   private static final String tableName = "profiler";
   private static final String columnFamily = "P";
+  private static final double epsilon = 0.001;
 
   /**
    * A TableProvider that allows us to mock HBase.
@@ -140,12 +142,12 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
             timeout(seconds(90)));
 
     // verify - only 10.0.0.2 sends 'HTTP', thus there should be only 1 value
-    List<Double> actuals = read(columnBuilder.getColumnQualifier("value"), Double.class);
-    Assert.assertEquals(1, actuals.size());
+    List<Double> actuals = read(profilerTable.getPutLog(), columnFamily, columnBuilder.getColumnQualifier("value"), Double.class);
 
     // verify - there are 5 'HTTP' each with 390 bytes
-    double actual = actuals.get(0);
-    Assert.assertEquals(390.0 * 5, actual, 0.01);
+    Assert.assertTrue(actuals.stream().anyMatch(val ->
+            MathUtils.equals(390.0 * 5, val, epsilon)
+    ));
   }
 
   /**
@@ -168,14 +170,17 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
             timeout(seconds(90)));
 
     // verify - expect 2 results as 2 hosts involved; 10.0.0.2 sends 'HTTP' and 10.0.0.3 send 'DNS'
-    List<Double> actuals = read(columnBuilder.getColumnQualifier("value"), Double.class);
-    Assert.assertEquals(expected, actuals.size());
+    List<Double> actuals = read(profilerTable.getPutLog(), columnFamily, columnBuilder.getColumnQualifier("value"), Double.class);
 
     // verify - 10.0.0.3 -> 1/6
-    Assert.assertTrue(actuals.stream().anyMatch(val -> MathUtils.equals(val, 1.0/6.0, 0.1)));
+    Assert.assertTrue(actuals.stream().anyMatch(val ->
+            MathUtils.equals(val, 1.0/6.0, epsilon)
+    ));
 
     // verify - 10.0.0.2 -> 6/1
-    Assert.assertTrue(actuals.stream().anyMatch(val -> MathUtils.equals(val, 6.0/1.0, 0.1)));
+    Assert.assertTrue(actuals.stream().anyMatch(val ->
+            MathUtils.equals(val, 6.0/1.0, epsilon)
+    ));
   }
 
   /**
@@ -195,12 +200,12 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
             timeout(seconds(90)));
 
     // verify - only 10.0.0.2 sends 'HTTP', thus there should be only 1 value
-    List<Double> actuals = read(columnBuilder.getColumnQualifier("value"), Double.class);
-    Assert.assertEquals(1, actuals.size());
+    List<Double> actuals = read(profilerTable.getPutLog(), columnFamily, columnBuilder.getColumnQualifier("value"), Double.class);
 
     // verify - there are 5 'HTTP' messages each with a length of 20, thus the average should be 20
-    double actual = actuals.get(0);
-    Assert.assertEquals(20.0, actual, 0.01);
+    Assert.assertTrue(actuals.stream().anyMatch(val ->
+            MathUtils.equals(val, 20.0, epsilon)
+    ));
   }
 
   @Test
@@ -220,11 +225,13 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
             timeout(seconds(90)));
 
     // verify - the profile sees messages from 3 hosts; 10.0.0.[1-3]
-    List<Integer> actuals = read(columnBuilder.getColumnQualifier("value"), Integer.class);
+    List<Integer> actuals = read(profilerTable.getPutLog(), columnFamily, columnBuilder.getColumnQualifier("value"), Integer.class);
     Assert.assertEquals(3, actuals.size());
 
     // verify - the profile writes 10 as an integer
-    actuals.forEach(actual -> Assert.assertEquals(10.0, actual, 0.01));
+    Assert.assertTrue(actuals.stream().anyMatch(val ->
+            MathUtils.equals(val, 10.0, epsilon)
+    ));
   }
 
   @Test
@@ -240,32 +247,28 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
     waitOrTimeout(() -> profilerTable.getPutLog().size() > 0,
             timeout(seconds(90)));
 
-    List<Double> actuals = read(columnBuilder.getColumnQualifier("value"), Double.class);
-
-    // verify - the profile only cares about HTTP and only 1 host sends HTTP
-    Assert.assertEquals(1, actuals.size());
+    List<Double> actuals = read(profilerTable.getPutLog(), columnFamily, columnBuilder.getColumnQualifier("value"), Double.class);
 
     // verify - the 70th percentile of 5 x 20s = 20.0
-    actuals.forEach(actual -> Assert.assertEquals(20.0, actual, 0.01));
+    Assert.assertTrue(actuals.stream().anyMatch(val ->
+            MathUtils.equals(val, 20.0, epsilon)));
   }
 
   /**
    * Reads a value written by the Profiler.
-   *
-   * @param column The column qualifier.
-   * @param clazz The expected type of the result.
-   * @param <T> The expected type of the result.
-   * @return The value contained within the column.
+   * @param family The column family.
+   * @param qualifier The column qualifier.
+   * @param clazz The expected type of the value.
+   * @param <T> The expected type of the value.
+   * @return The value written by the Profiler.
    */
-  private <T> List<T> read(byte[] column, Class<T> clazz) throws IOException {
+  private <T> List<T> read(List<Put> puts, String family, byte[] qualifier, Class<T> clazz) {
     List<T> results = new ArrayList<>();
 
-    final byte[] cf = Bytes.toBytes(columnFamily);
-    ResultScanner scanner = profilerTable.getScanner(cf, column);
-    for (Result result : scanner.next(10)) {
-      if (result.containsColumn(cf, column)) {
-        byte[] raw = result.getValue(cf, column);
-        results.add(SerDeUtils.fromBytes(raw, clazz));
+    for(Put put: puts) {
+      for(Cell cell: put.get(Bytes.toBytes(family), qualifier)) {
+        T value = SerDeUtils.fromBytes(cell.getValue(), clazz);
+        results.add(value);
       }
     }
 
@@ -288,7 +291,7 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
       setProperty("profiler.workers", "1");
       setProperty("profiler.executors", "0");
       setProperty("profiler.input.topic", Constants.INDEXING_TOPIC);
-      setProperty("profiler.period.duration", "5");
+      setProperty("profiler.period.duration", "20");
       setProperty("profiler.period.duration.units", "SECONDS");
       setProperty("profiler.ttl", "30");
       setProperty("profiler.ttl.units", "MINUTES");

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/354cb6dd/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/mock/MockHTable.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/mock/MockHTable.java b/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/mock/MockHTable.java
index 5f2f855..1177c75 100644
--- a/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/mock/MockHTable.java
+++ b/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/mock/MockHTable.java
@@ -18,6 +18,7 @@
 package org.apache.metron.test.mock;
 
 
+import com.google.common.collect.ImmutableList;
 import com.google.protobuf.Descriptors;
 import com.google.protobuf.Message;
 import com.google.protobuf.Service;
@@ -28,7 +29,18 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Row;
+import org.apache.hadoop.hbase.client.RowMutations;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
 import org.apache.hadoop.hbase.filter.CompareFilter;
 import org.apache.hadoop.hbase.filter.Filter;
@@ -37,7 +49,16 @@ import org.apache.hadoop.hbase.util.Bytes;
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.NavigableSet;
+import java.util.NoSuchElementException;
+import java.util.TreeMap;
 
 /**
  * MockHTable.
@@ -53,9 +74,11 @@ public class MockHTable implements HTableInterface {
       HTableInterface ret = _cache.get(tableName);
       return ret;
     }
+
     public static HTableInterface getFromCache(String tableName) {
       return _cache.get(tableName);
     }
+
     public static HTableInterface addToCache(String tableName, String... columnFamilies) {
       MockHTable ret =  new MockHTable(tableName, columnFamilies);
       _cache.put(tableName, ret);
@@ -70,7 +93,7 @@ public class MockHTable implements HTableInterface {
   private final String tableName;
   private final List<String> columnFamilies = new ArrayList<>();
   private HColumnDescriptor[] descriptors;
-
+  private List<Put> putLog;
   private NavigableMap<byte[], NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>> data
           = new TreeMap<>(Bytes.BYTES_COMPARATOR);
 
@@ -99,10 +122,11 @@ public class MockHTable implements HTableInterface {
   }
   public MockHTable(String tableName) {
     this.tableName = tableName;
+    this.putLog = new ArrayList<>();
   }
 
   public MockHTable(String tableName, String... columnFamilies) {
-    this.tableName = tableName;
+    this(tableName);
     for(String cf : columnFamilies) {
       addColumnFamily(cf);
     }
@@ -111,6 +135,7 @@ public class MockHTable implements HTableInterface {
   public int size() {
     return data.size();
   }
+
   public void addColumnFamily(String columnFamily) {
     this.columnFamilies.add(columnFamily);
     descriptors = new HColumnDescriptor[columnFamilies.size()];
@@ -440,15 +465,22 @@ public class MockHTable implements HTableInterface {
     return getScanner(scan);
   }
 
-  List<Put> putLog = new ArrayList<>();
-
   public List<Put> getPutLog() {
-    return putLog;
+    synchronized (putLog) {
+      return ImmutableList.copyOf(putLog);
+    }
+  }
+
+  public void addToPutLog(Put put) {
+    synchronized(putLog) {
+      putLog.add(put);
+    }
   }
 
   @Override
   public void put(Put put) throws IOException {
-    putLog.add(put);
+    addToPutLog(put);
+
     byte[] row = put.getRow();
     NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowData = forceFind(data, row, new TreeMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>(Bytes.BYTES_COMPARATOR));
     for (byte[] family : put.getFamilyMap().keySet()){