You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/05/26 16:41:32 UTC
incubator-metron git commit: METRON-100 GeoIP errors out silently in
vagrant closes apache/incubator-metron#134
Repository: incubator-metron
Updated Branches:
refs/heads/master f374daa05 -> 681b61a2b
METRON-100 GeoIP errors out silently in vagrant closes apache/incubator-metron#134
Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/681b61a2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/681b61a2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/681b61a2
Branch: refs/heads/master
Commit: 681b61a2b8ad9873b228e11780f91a1c85897d83
Parents: f374daa
Author: cstella <ce...@gmail.com>
Authored: Thu May 26 12:41:25 2016 -0400
Committer: cstella <ce...@gmail.com>
Committed: Thu May 26 12:41:25 2016 -0400
----------------------------------------------------------------------
.../enrichment/adapters/geo/GeoAdapter.java | 8 +++-
.../enrichment/adapters/jdbc/JdbcAdapter.java | 44 ++++++++++++++++----
.../simplehbase/SimpleHBaseAdapter.java | 14 +++++--
.../threatintel/ThreatIntelAdapter.java | 15 +++++--
.../enrichment/bolt/GenericEnrichmentBolt.java | 27 ++++++++----
.../enrichment/adapters/geo/GeoAdapterTest.java | 9 +++-
.../threatintel/ThreatIntelAdapterTest.java | 4 +-
.../bolt/GenericEnrichmentBoltTest.java | 3 +-
8 files changed, 97 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/681b61a2/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/geo/GeoAdapter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/geo/GeoAdapter.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/geo/GeoAdapter.java
index 5d12a29..111dbff 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/geo/GeoAdapter.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/geo/GeoAdapter.java
@@ -24,6 +24,7 @@ import org.json.simple.JSONObject;
import java.net.InetAddress;
import java.sql.ResultSet;
+import java.sql.SQLException;
public class GeoAdapter extends JdbcAdapter {
@@ -34,10 +35,15 @@ public class GeoAdapter extends JdbcAdapter {
}
+
@SuppressWarnings("unchecked")
@Override
public JSONObject enrich(CacheKey value) {
JSONObject enriched = new JSONObject();
+ if(!resetConnectionIfNecessary()) {
+ _LOG.error("Enrichment failure, cannot maintain a connection to JDBC. Please check connection. In the meantime, I'm not enriching.");
+ return enriched;
+ }
try {
InetAddress addr = InetAddress.getByName(value.getValue());
if (addr.isAnyLocalAddress() || addr.isLoopbackAddress()
@@ -45,7 +51,7 @@ public class GeoAdapter extends JdbcAdapter {
|| !ipvalidator.isValidInet4Address(value.getValue())) {
return new JSONObject();
}
- String locidQuery = "select IPTOLOCID(\"" + value
+ String locidQuery = "select IPTOLOCID(\"" + value.getValue()
+ "\") as ANS";
ResultSet resultSet = statement.executeQuery(locidQuery);
String locid = null;
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/681b61a2/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/jdbc/JdbcAdapter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/jdbc/JdbcAdapter.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/jdbc/JdbcAdapter.java
index 9233059..708eb67 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/jdbc/JdbcAdapter.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/jdbc/JdbcAdapter.java
@@ -26,8 +26,7 @@ import java.io.Serializable;
import java.net.InetAddress;
import java.sql.*;
-public abstract class JdbcAdapter implements EnrichmentAdapter<CacheKey>,
- Serializable {
+public abstract class JdbcAdapter implements EnrichmentAdapter<CacheKey>, Serializable {
protected static final Logger _LOG = LoggerFactory
.getLogger(JdbcAdapter.class);
@@ -38,6 +37,26 @@ public abstract class JdbcAdapter implements EnrichmentAdapter<CacheKey>,
private JdbcConfig config;
private String host;
+ protected boolean isConnectionClosed() {
+ boolean isClosed = statement == null || connection == null;
+ if(!isClosed) {
+ try {
+ isClosed = statement.isClosed() || connection.isClosed();
+ } catch (SQLException e) {
+ _LOG.error("Unable to maintain open JDBC connection: " + e.getMessage(), e);
+ isClosed = true;
+ }
+ }
+ return isClosed;
+ }
+
+ protected boolean resetConnectionIfNecessary() {
+ if(isConnectionClosed()) {
+ this.cleanup();
+ return this.initializeAdapter();
+ }
+ return true;
+ }
public void setStatement(Statement statement) {
this.statement = statement;
}
@@ -48,6 +67,7 @@ public abstract class JdbcAdapter implements EnrichmentAdapter<CacheKey>,
return this;
}
+
@Override
public boolean initializeAdapter() {
try {
@@ -57,16 +77,15 @@ public abstract class JdbcAdapter implements EnrichmentAdapter<CacheKey>,
Class.forName(this.config.getClassName());
connection = DriverManager.getConnection(this.config.getJdbcUrl());
connection.setReadOnly(true);
- if (!connection.isValid(0))
+ if (!connection.isValid(0)) {
throw new Exception("Invalid connection string....");
+ }
statement = connection.createStatement(
ResultSet.TYPE_SCROLL_INSENSITIVE,
ResultSet.CONCUR_READ_ONLY);
return true;
} catch (Exception e) {
- e.printStackTrace();
_LOG.error("[Metron] JDBC connection failed....", e);
-
return false;
}
}
@@ -74,10 +93,19 @@ public abstract class JdbcAdapter implements EnrichmentAdapter<CacheKey>,
@Override
public void cleanup() {
try {
- if (statement != null) statement.close();
- if (connection != null) connection.close();
+ if (statement != null) {
+ statement.close();
+ }
} catch (SQLException e) {
- e.printStackTrace();
+ _LOG.error("[Metron] JDBC statement close failed....", e);
+ }
+ try {
+ if (connection != null) {
+ connection.close();
+ }
+ }
+ catch(SQLException e) {
+ _LOG.error("[Metron] JDBC connection close failed....", e);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/681b61a2/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseAdapter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseAdapter.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseAdapter.java
index 22629a4..190ed5a 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseAdapter.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseAdapter.java
@@ -60,13 +60,19 @@ public class SimpleHBaseAdapter implements EnrichmentAdapter<CacheKey>,Serializa
}
+ public boolean isInitialized() {
+ return lookup != null && lookup.getTable() != null;
+ }
@Override
public JSONObject enrich(CacheKey value) {
JSONObject enriched = new JSONObject();
+ if(!isInitialized()) {
+ initializeAdapter();
+ }
List<String> enrichmentTypes = value.getConfig()
.getEnrichment().getFieldToTypeMap()
.get(EnrichmentUtils.toTopLevelField(value.getField()));
- if(enrichmentTypes != null && value.getValue() != null) {
+ if(isInitialized() && enrichmentTypes != null && value.getValue() != null) {
try {
for (LookupKV<EnrichmentKey, EnrichmentValue> kv :
lookup.get(Iterables.transform(enrichmentTypes
@@ -87,6 +93,7 @@ public class SimpleHBaseAdapter implements EnrichmentAdapter<CacheKey>,Serializa
}
catch (IOException e) {
_LOG.error("Unable to retrieve value: " + e.getMessage(), e);
+ initializeAdapter();
throw new RuntimeException("Unable to retrieve value: " + e.getMessage(), e);
}
}
@@ -103,7 +110,8 @@ public class SimpleHBaseAdapter implements EnrichmentAdapter<CacheKey>,Serializa
, new NoopAccessTracker()
);
} catch (IOException e) {
- throw new RuntimeException("Unable to initialize adapter: " + e.getMessage(), e);
+ _LOG.error("Unable to initialize adapter: " + e.getMessage(), e);
+ return false;
}
return true;
}
@@ -113,7 +121,7 @@ public class SimpleHBaseAdapter implements EnrichmentAdapter<CacheKey>,Serializa
try {
lookup.close();
} catch (Exception e) {
- throw new RuntimeException("Unable to cleanup access tracker", e);
+ _LOG.error("Unable to cleanup access tracker", e);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/681b61a2/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelAdapter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelAdapter.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelAdapter.java
index ee5636b..603f934 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelAdapter.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelAdapter.java
@@ -65,11 +65,14 @@ public class ThreatIntelAdapter implements EnrichmentAdapter<CacheKey>,Serializa
@Override
public JSONObject enrich(CacheKey value) {
+ if(!isInitialized()) {
+ initializeAdapter();
+ }
JSONObject enriched = new JSONObject();
List<String> enrichmentTypes = value.getConfig()
.getThreatIntel().getFieldToTypeMap()
.get(EnrichmentUtils.toTopLevelField(value.getField()));
- if(enrichmentTypes != null) {
+ if(isInitialized() && enrichmentTypes != null) {
int i = 0;
try {
for (Boolean isThreat :
@@ -89,13 +92,18 @@ public class ThreatIntelAdapter implements EnrichmentAdapter<CacheKey>,Serializa
}
}
catch(IOException e) {
+ _LOG.error("Unable to retrieve value: " + e.getMessage(), e);
+ initializeAdapter();
throw new RuntimeException("Unable to retrieve value", e);
}
}
- //throw new RuntimeException("Unable to retrieve value " + value);
return enriched;
}
+ public boolean isInitialized() {
+ return lookup != null && lookup.getTable() != null;
+ }
+
@Override
public boolean initializeAdapter() {
PersistentAccessTracker accessTracker;
@@ -117,7 +125,8 @@ public class ThreatIntelAdapter implements EnrichmentAdapter<CacheKey>,Serializa
);
lookup = new EnrichmentLookup(config.getProvider().getTable(hbaseConfig, hbaseTable), config.getHBaseCF(), accessTracker);
} catch (IOException e) {
- throw new IllegalStateException("Unable to initialize ThreatIntelAdapter", e);
+ _LOG.error("Unable to initialize ThreatIntelAdapter", e);
+ return false;
}
return true;
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/681b61a2/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
index 234c795..3a4b67d 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
@@ -176,6 +176,7 @@ public class GenericEnrichmentBolt extends ConfiguredEnrichmentBolt {
else {
throw new RuntimeException("Source type is missing from enrichment fragment: " + rawMessage.toJSONString());
}
+ boolean error = false;
for (Object o : rawMessage.keySet()) {
String field = (String) o;
String value = (String) rawMessage.get(field);
@@ -186,14 +187,23 @@ public class GenericEnrichmentBolt extends ConfiguredEnrichmentBolt {
if (value != null && value.length() != 0) {
SensorEnrichmentConfig config = configurations.getSensorEnrichmentConfig(sourceType);
if(config == null) {
- throw new RuntimeException("Unable to find " + config);
+ LOG.error("Unable to find " + config);
+ error = true;
+ continue;
}
CacheKey cacheKey= new CacheKey(field, value, config);
- adapter.logAccess(cacheKey);
- enrichedField = cache.getUnchecked(cacheKey);
- if (enrichedField == null)
- throw new Exception("[Metron] Could not enrich string: "
- + value);
+ try {
+ adapter.logAccess(cacheKey);
+ enrichedField = cache.getUnchecked(cacheKey);
+ if (enrichedField == null)
+ throw new Exception("[Metron] Could not enrich string: "
+ + value);
+ }
+ catch(Exception e) {
+ LOG.error(e.getMessage(), e);
+ error = true;
+ continue;
+ }
}
if (!enrichedField.isEmpty()) {
for (Object enrichedKey : enrichedField.keySet()) {
@@ -206,7 +216,10 @@ public class GenericEnrichmentBolt extends ConfiguredEnrichmentBolt {
}
enrichedMessage.put("adapter." + adapter.getClass().getSimpleName().toLowerCase() + ".end.ts", "" + System.currentTimeMillis());
- if (!enrichedMessage.isEmpty()) {
+ if(error) {
+ throw new Exception("Unable to enrich " + enrichedMessage + " check logs for specifics.");
+ }
+ if (enrichedMessage != null && !enrichedMessage.isEmpty()) {
collector.emit(enrichmentType, new Values(key, enrichedMessage));
}
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/681b61a2/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/geo/GeoAdapterTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/geo/GeoAdapterTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/geo/GeoAdapterTest.java
index ec90c49..ac5adde 100644
--- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/geo/GeoAdapterTest.java
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/geo/GeoAdapterTest.java
@@ -66,7 +66,7 @@ public class GeoAdapterTest {
JSONParser jsonParser = new JSONParser();
expectedMessage = (JSONObject) jsonParser.parse(expectedMessageString);
MockitoAnnotations.initMocks(this);
- when(statetment.executeQuery("select IPTOLOCID(\"CacheKey{field='dummy', value='72.163.4.161'}\") as ANS")).thenReturn(resultSet);
+ when(statetment.executeQuery("select IPTOLOCID(\"" + ip + "\") as ANS")).thenReturn(resultSet);
when(statetment.executeQuery("select * from location where locID = 1")).thenReturn(resultSet1);
when(resultSet.next()).thenReturn(Boolean.TRUE, Boolean.FALSE);
when(resultSet.getString("ANS")).thenReturn("1");
@@ -83,7 +83,12 @@ public class GeoAdapterTest {
@Test
public void testEnrich() throws Exception {
- GeoAdapter geo = new GeoAdapter();
+ GeoAdapter geo = new GeoAdapter() {
+ @Override
+ public boolean initializeAdapter() {
+ return true;
+ }
+ };
geo.setStatement(statetment);
JSONObject actualMessage = geo.enrich(new CacheKey("dummy", ip, null));
Assert.assertNotNull(actualMessage.get("locID"));
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/681b61a2/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelAdapterTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelAdapterTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelAdapterTest.java
index 2afeb5b..f420b01 100644
--- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelAdapterTest.java
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelAdapterTest.java
@@ -122,7 +122,7 @@ public class ThreatIntelAdapterTest {
Assert.assertEquals(expectedMessage, actualMessage);
}
- @Test(expected = IllegalStateException.class)
+ @Test
public void testInitializeAdapter() {
String cf = "cf";
@@ -145,7 +145,7 @@ public class ThreatIntelAdapterTest {
ThreatIntelAdapter tia = new ThreatIntelAdapter(config);
tia.initializeAdapter();
-
+ Assert.assertFalse(tia.isInitialized());
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/681b61a2/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBoltTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBoltTest.java
index a9a1637..31168ff 100644
--- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBoltTest.java
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBoltTest.java
@@ -18,6 +18,7 @@
package org.apache.metron.enrichment.bolt;
import backtype.storm.tuple.Values;
+import com.google.common.collect.ImmutableMap;
import org.adrianwalker.multilinestring.Multiline;
import org.apache.metron.TestConstants;
import org.apache.metron.test.bolt.BaseEnrichmentBoltTest;
@@ -177,7 +178,7 @@ public class GenericEnrichmentBoltTest extends BaseEnrichmentBoltTest {
when(tuple.getStringByField("key")).thenReturn(key);
when(tuple.getValueByField("message")).thenReturn(originalMessage);
genericEnrichmentBolt.execute(tuple);
- verify(outputCollector, times(1)).emit(eq(enrichmentType), argThat(new EnrichedMessageMatcher(key, new JSONObject())));
+ verify(outputCollector, times(1)).emit(eq(enrichmentType), argThat(new EnrichedMessageMatcher(key, new JSONObject(ImmutableMap.of("source.type", "test")))));
reset(enrichmentAdapter);
SensorEnrichmentConfig sensorEnrichmentConfig = SensorEnrichmentConfig.