You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ni...@apache.org on 2016/12/05 16:37:50 UTC
incubator-metron git commit: METRON-597 Sporadic Failures of Profiler
Integration Tests (nickwallen) closes apache/incubator-metron#383
Repository: incubator-metron
Updated Branches:
refs/heads/master 2fd7f95c2 -> 354cb6ddc
METRON-597 Sporadic Failures of Profiler Integration Tests (nickwallen) closes apache/incubator-metron#383
Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/354cb6dd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/354cb6dd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/354cb6dd
Branch: refs/heads/master
Commit: 354cb6ddce7f5313fb140fd3ab8521f9dce0c35f
Parents: 2fd7f95
Author: nickwallen <ni...@nickallen.org>
Authored: Mon Dec 5 11:37:16 2016 -0500
Committer: Nick Allen <ni...@nickallen.org>
Committed: Mon Dec 5 11:37:16 2016 -0500
----------------------------------------------------------------------
.../integration/ProfilerIntegrationTest.java | 71 ++++++++++----------
.../org/apache/metron/test/mock/MockHTable.java | 48 ++++++++++---
2 files changed, 77 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/354cb6dd/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
index fc1c24f..e09f7f1 100644
--- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
+++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
@@ -21,11 +21,12 @@
package org.apache.metron.profiler.integration;
import org.adrianwalker.multilinestring.Multiline;
+import org.apache.commons.lang.math.NumberUtils;
import org.apache.commons.math.util.MathUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.metron.common.Constants;
import org.apache.metron.common.spout.kafka.SpoutConfig;
@@ -109,6 +110,7 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
private static final String tableName = "profiler";
private static final String columnFamily = "P";
+ private static final double epsilon = 0.001;
/**
* A TableProvider that allows us to mock HBase.
@@ -140,12 +142,12 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
timeout(seconds(90)));
// verify - only 10.0.0.2 sends 'HTTP', thus there should be only 1 value
- List<Double> actuals = read(columnBuilder.getColumnQualifier("value"), Double.class);
- Assert.assertEquals(1, actuals.size());
+ List<Double> actuals = read(profilerTable.getPutLog(), columnFamily, columnBuilder.getColumnQualifier("value"), Double.class);
// verify - there are 5 'HTTP' each with 390 bytes
- double actual = actuals.get(0);
- Assert.assertEquals(390.0 * 5, actual, 0.01);
+ Assert.assertTrue(actuals.stream().anyMatch(val ->
+ MathUtils.equals(390.0 * 5, val, epsilon)
+ ));
}
/**
@@ -168,14 +170,17 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
timeout(seconds(90)));
// verify - expect 2 results as 2 hosts involved; 10.0.0.2 sends 'HTTP' and 10.0.0.3 send 'DNS'
- List<Double> actuals = read(columnBuilder.getColumnQualifier("value"), Double.class);
- Assert.assertEquals(expected, actuals.size());
+ List<Double> actuals = read(profilerTable.getPutLog(), columnFamily, columnBuilder.getColumnQualifier("value"), Double.class);
// verify - 10.0.0.3 -> 1/6
- Assert.assertTrue(actuals.stream().anyMatch(val -> MathUtils.equals(val, 1.0/6.0, 0.1)));
+ Assert.assertTrue(actuals.stream().anyMatch(val ->
+ MathUtils.equals(val, 1.0/6.0, epsilon)
+ ));
// verify - 10.0.0.2 -> 6/1
- Assert.assertTrue(actuals.stream().anyMatch(val -> MathUtils.equals(val, 6.0/1.0, 0.1)));
+ Assert.assertTrue(actuals.stream().anyMatch(val ->
+ MathUtils.equals(val, 6.0/1.0, epsilon)
+ ));
}
/**
@@ -195,12 +200,12 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
timeout(seconds(90)));
// verify - only 10.0.0.2 sends 'HTTP', thus there should be only 1 value
- List<Double> actuals = read(columnBuilder.getColumnQualifier("value"), Double.class);
- Assert.assertEquals(1, actuals.size());
+ List<Double> actuals = read(profilerTable.getPutLog(), columnFamily, columnBuilder.getColumnQualifier("value"), Double.class);
// verify - there are 5 'HTTP' messages each with a length of 20, thus the average should be 20
- double actual = actuals.get(0);
- Assert.assertEquals(20.0, actual, 0.01);
+ Assert.assertTrue(actuals.stream().anyMatch(val ->
+ MathUtils.equals(val, 20.0, epsilon)
+ ));
}
@Test
@@ -220,11 +225,13 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
timeout(seconds(90)));
// verify - the profile sees messages from 3 hosts; 10.0.0.[1-3]
- List<Integer> actuals = read(columnBuilder.getColumnQualifier("value"), Integer.class);
+ List<Integer> actuals = read(profilerTable.getPutLog(), columnFamily, columnBuilder.getColumnQualifier("value"), Integer.class);
Assert.assertEquals(3, actuals.size());
// verify - the profile writes 10 as an integer
- actuals.forEach(actual -> Assert.assertEquals(10.0, actual, 0.01));
+ Assert.assertTrue(actuals.stream().anyMatch(val ->
+ MathUtils.equals(val, 10.0, epsilon)
+ ));
}
@Test
@@ -240,32 +247,28 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
waitOrTimeout(() -> profilerTable.getPutLog().size() > 0,
timeout(seconds(90)));
- List<Double> actuals = read(columnBuilder.getColumnQualifier("value"), Double.class);
-
- // verify - the profile only cares about HTTP and only 1 host sends HTTP
- Assert.assertEquals(1, actuals.size());
+ List<Double> actuals = read(profilerTable.getPutLog(), columnFamily, columnBuilder.getColumnQualifier("value"), Double.class);
// verify - the 70th percentile of 5 x 20s = 20.0
- actuals.forEach(actual -> Assert.assertEquals(20.0, actual, 0.01));
+ Assert.assertTrue(actuals.stream().anyMatch(val ->
+ MathUtils.equals(val, 20.0, epsilon)));
}
/**
* Reads a value written by the Profiler.
- *
- * @param column The column qualifier.
- * @param clazz The expected type of the result.
- * @param <T> The expected type of the result.
- * @return The value contained within the column.
+ * @param family The column family.
+ * @param qualifier The column qualifier.
+ * @param clazz The expected type of the value.
+ * @param <T> The expected type of the value.
+ * @return The value written by the Profiler.
*/
- private <T> List<T> read(byte[] column, Class<T> clazz) throws IOException {
+ private <T> List<T> read(List<Put> puts, String family, byte[] qualifier, Class<T> clazz) {
List<T> results = new ArrayList<>();
- final byte[] cf = Bytes.toBytes(columnFamily);
- ResultScanner scanner = profilerTable.getScanner(cf, column);
- for (Result result : scanner.next(10)) {
- if (result.containsColumn(cf, column)) {
- byte[] raw = result.getValue(cf, column);
- results.add(SerDeUtils.fromBytes(raw, clazz));
+ for(Put put: puts) {
+ for(Cell cell: put.get(Bytes.toBytes(family), qualifier)) {
+ T value = SerDeUtils.fromBytes(cell.getValue(), clazz);
+ results.add(value);
}
}
@@ -288,7 +291,7 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
setProperty("profiler.workers", "1");
setProperty("profiler.executors", "0");
setProperty("profiler.input.topic", Constants.INDEXING_TOPIC);
- setProperty("profiler.period.duration", "5");
+ setProperty("profiler.period.duration", "20");
setProperty("profiler.period.duration.units", "SECONDS");
setProperty("profiler.ttl", "30");
setProperty("profiler.ttl.units", "MINUTES");
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/354cb6dd/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/mock/MockHTable.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/mock/MockHTable.java b/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/mock/MockHTable.java
index 5f2f855..1177c75 100644
--- a/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/mock/MockHTable.java
+++ b/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/mock/MockHTable.java
@@ -18,6 +18,7 @@
package org.apache.metron.test.mock;
+import com.google.common.collect.ImmutableList;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import com.google.protobuf.Service;
@@ -28,7 +29,18 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Row;
+import org.apache.hadoop.hbase.client.RowMutations;
+import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.Filter;
@@ -37,7 +49,16 @@ import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.io.Serializable;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.NavigableSet;
+import java.util.NoSuchElementException;
+import java.util.TreeMap;
/**
* MockHTable.
@@ -53,9 +74,11 @@ public class MockHTable implements HTableInterface {
HTableInterface ret = _cache.get(tableName);
return ret;
}
+
public static HTableInterface getFromCache(String tableName) {
return _cache.get(tableName);
}
+
public static HTableInterface addToCache(String tableName, String... columnFamilies) {
MockHTable ret = new MockHTable(tableName, columnFamilies);
_cache.put(tableName, ret);
@@ -70,7 +93,7 @@ public class MockHTable implements HTableInterface {
private final String tableName;
private final List<String> columnFamilies = new ArrayList<>();
private HColumnDescriptor[] descriptors;
-
+ private List<Put> putLog;
private NavigableMap<byte[], NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>> data
= new TreeMap<>(Bytes.BYTES_COMPARATOR);
@@ -99,10 +122,11 @@ public class MockHTable implements HTableInterface {
}
public MockHTable(String tableName) {
this.tableName = tableName;
+ this.putLog = new ArrayList<>();
}
public MockHTable(String tableName, String... columnFamilies) {
- this.tableName = tableName;
+ this(tableName);
for(String cf : columnFamilies) {
addColumnFamily(cf);
}
@@ -111,6 +135,7 @@ public class MockHTable implements HTableInterface {
public int size() {
return data.size();
}
+
public void addColumnFamily(String columnFamily) {
this.columnFamilies.add(columnFamily);
descriptors = new HColumnDescriptor[columnFamilies.size()];
@@ -440,15 +465,22 @@ public class MockHTable implements HTableInterface {
return getScanner(scan);
}
- List<Put> putLog = new ArrayList<>();
-
public List<Put> getPutLog() {
- return putLog;
+ synchronized (putLog) {
+ return ImmutableList.copyOf(putLog);
+ }
+ }
+
+ public void addToPutLog(Put put) {
+ synchronized(putLog) {
+ putLog.add(put);
+ }
}
@Override
public void put(Put put) throws IOException {
- putLog.add(put);
+ addToPutLog(put);
+
byte[] row = put.getRow();
NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowData = forceFind(data, row, new TreeMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>(Bytes.BYTES_COMPARATOR));
for (byte[] family : put.getFamilyMap().keySet()){