You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/05/26 16:41:32 UTC

incubator-metron git commit: METRON-100 GeoIP errors out silently in vagrant closes apache/incubator-metron#134

Repository: incubator-metron
Updated Branches:
  refs/heads/master f374daa05 -> 681b61a2b


METRON-100 GeoIP errors out silently in vagrant closes apache/incubator-metron#134


Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/681b61a2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/681b61a2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/681b61a2

Branch: refs/heads/master
Commit: 681b61a2b8ad9873b228e11780f91a1c85897d83
Parents: f374daa
Author: cstella <ce...@gmail.com>
Authored: Thu May 26 12:41:25 2016 -0400
Committer: cstella <ce...@gmail.com>
Committed: Thu May 26 12:41:25 2016 -0400

----------------------------------------------------------------------
 .../enrichment/adapters/geo/GeoAdapter.java     |  8 +++-
 .../enrichment/adapters/jdbc/JdbcAdapter.java   | 44 ++++++++++++++++----
 .../simplehbase/SimpleHBaseAdapter.java         | 14 +++++--
 .../threatintel/ThreatIntelAdapter.java         | 15 +++++--
 .../enrichment/bolt/GenericEnrichmentBolt.java  | 27 ++++++++----
 .../enrichment/adapters/geo/GeoAdapterTest.java |  9 +++-
 .../threatintel/ThreatIntelAdapterTest.java     |  4 +-
 .../bolt/GenericEnrichmentBoltTest.java         |  3 +-
 8 files changed, 97 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/681b61a2/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/geo/GeoAdapter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/geo/GeoAdapter.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/geo/GeoAdapter.java
index 5d12a29..111dbff 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/geo/GeoAdapter.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/geo/GeoAdapter.java
@@ -24,6 +24,7 @@ import org.json.simple.JSONObject;
 
 import java.net.InetAddress;
 import java.sql.ResultSet;
+import java.sql.SQLException;
 
 public class GeoAdapter extends JdbcAdapter {
 
@@ -34,10 +35,15 @@ public class GeoAdapter extends JdbcAdapter {
 
   }
 
+
   @SuppressWarnings("unchecked")
   @Override
   public JSONObject enrich(CacheKey value) {
     JSONObject enriched = new JSONObject();
+    if(!resetConnectionIfNecessary()) {
+      _LOG.error("Enrichment failure, cannot maintain a connection to JDBC.  Please check connection.  In the meantime, I'm not enriching.");
+      return enriched;
+    }
     try {
       InetAddress addr = InetAddress.getByName(value.getValue());
       if (addr.isAnyLocalAddress() || addr.isLoopbackAddress()
@@ -45,7 +51,7 @@ public class GeoAdapter extends JdbcAdapter {
               || !ipvalidator.isValidInet4Address(value.getValue())) {
         return new JSONObject();
       }
-      String locidQuery = "select IPTOLOCID(\"" + value
+      String locidQuery = "select IPTOLOCID(\"" + value.getValue()
               + "\") as ANS";
       ResultSet resultSet = statement.executeQuery(locidQuery);
       String locid = null;

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/681b61a2/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/jdbc/JdbcAdapter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/jdbc/JdbcAdapter.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/jdbc/JdbcAdapter.java
index 9233059..708eb67 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/jdbc/JdbcAdapter.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/jdbc/JdbcAdapter.java
@@ -26,8 +26,7 @@ import java.io.Serializable;
 import java.net.InetAddress;
 import java.sql.*;
 
-public abstract class JdbcAdapter implements EnrichmentAdapter<CacheKey>,
-        Serializable {
+public abstract class JdbcAdapter implements EnrichmentAdapter<CacheKey>, Serializable {
 
   protected static final Logger _LOG = LoggerFactory
           .getLogger(JdbcAdapter.class);
@@ -38,6 +37,26 @@ public abstract class JdbcAdapter implements EnrichmentAdapter<CacheKey>,
   private JdbcConfig config;
   private String host;
 
+  protected boolean isConnectionClosed() {
+    boolean isClosed = statement == null || connection == null;
+    if(!isClosed) {
+      try {
+        isClosed = statement.isClosed() || connection.isClosed();
+      } catch (SQLException e) {
+        _LOG.error("Unable to maintain open JDBC connection: " + e.getMessage(), e);
+        isClosed = true;
+      }
+    }
+    return isClosed;
+  }
+
+  protected boolean resetConnectionIfNecessary() {
+    if(isConnectionClosed()) {
+      this.cleanup();
+      return this.initializeAdapter();
+    }
+    return true;
+  }
   public void setStatement(Statement statement) {
     this.statement = statement;
   }
@@ -48,6 +67,7 @@ public abstract class JdbcAdapter implements EnrichmentAdapter<CacheKey>,
     return this;
   }
 
+
   @Override
   public boolean initializeAdapter() {
     try {
@@ -57,16 +77,15 @@ public abstract class JdbcAdapter implements EnrichmentAdapter<CacheKey>,
       Class.forName(this.config.getClassName());
       connection = DriverManager.getConnection(this.config.getJdbcUrl());
       connection.setReadOnly(true);
-      if (!connection.isValid(0))
+      if (!connection.isValid(0)) {
         throw new Exception("Invalid connection string....");
+      }
       statement = connection.createStatement(
               ResultSet.TYPE_SCROLL_INSENSITIVE,
               ResultSet.CONCUR_READ_ONLY);
       return true;
     } catch (Exception e) {
-      e.printStackTrace();
       _LOG.error("[Metron] JDBC connection failed....", e);
-
       return false;
     }
   }
@@ -74,10 +93,19 @@ public abstract class JdbcAdapter implements EnrichmentAdapter<CacheKey>,
   @Override
   public void cleanup() {
     try {
-      if (statement != null) statement.close();
-      if (connection != null) connection.close();
+      if (statement != null) {
+        statement.close();
+      }
     } catch (SQLException e) {
-      e.printStackTrace();
+      _LOG.error("[Metron] JDBC statement close failed....", e);
+    }
+    try {
+      if (connection != null) {
+        connection.close();
+      }
+    }
+    catch(SQLException e) {
+      _LOG.error("[Metron] JDBC connection close failed....", e);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/681b61a2/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseAdapter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseAdapter.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseAdapter.java
index 22629a4..190ed5a 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseAdapter.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseAdapter.java
@@ -60,13 +60,19 @@ public class SimpleHBaseAdapter implements EnrichmentAdapter<CacheKey>,Serializa
   }
 
 
+  public boolean isInitialized() {
+    return lookup != null && lookup.getTable() != null;
+  }
   @Override
   public JSONObject enrich(CacheKey value) {
     JSONObject enriched = new JSONObject();
+    if(!isInitialized()) {
+      initializeAdapter();
+    }
     List<String> enrichmentTypes = value.getConfig()
                                         .getEnrichment().getFieldToTypeMap()
                                         .get(EnrichmentUtils.toTopLevelField(value.getField()));
-    if(enrichmentTypes != null && value.getValue() != null) {
+    if(isInitialized() && enrichmentTypes != null && value.getValue() != null) {
       try {
         for (LookupKV<EnrichmentKey, EnrichmentValue> kv :
                 lookup.get(Iterables.transform(enrichmentTypes
@@ -87,6 +93,7 @@ public class SimpleHBaseAdapter implements EnrichmentAdapter<CacheKey>,Serializa
       }
       catch (IOException e) {
         _LOG.error("Unable to retrieve value: " + e.getMessage(), e);
+        initializeAdapter();
         throw new RuntimeException("Unable to retrieve value: " + e.getMessage(), e);
       }
     }
@@ -103,7 +110,8 @@ public class SimpleHBaseAdapter implements EnrichmentAdapter<CacheKey>,Serializa
                                    , new NoopAccessTracker()
                                    );
     } catch (IOException e) {
-      throw new RuntimeException("Unable to initialize adapter: " + e.getMessage(), e);
+      _LOG.error("Unable to initialize adapter: " + e.getMessage(), e);
+      return false;
     }
     return true;
   }
@@ -113,7 +121,7 @@ public class SimpleHBaseAdapter implements EnrichmentAdapter<CacheKey>,Serializa
     try {
       lookup.close();
     } catch (Exception e) {
-      throw new RuntimeException("Unable to cleanup access tracker", e);
+      _LOG.error("Unable to cleanup access tracker", e);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/681b61a2/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelAdapter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelAdapter.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelAdapter.java
index ee5636b..603f934 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelAdapter.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelAdapter.java
@@ -65,11 +65,14 @@ public class ThreatIntelAdapter implements EnrichmentAdapter<CacheKey>,Serializa
 
   @Override
   public JSONObject enrich(CacheKey value) {
+    if(!isInitialized()) {
+      initializeAdapter();
+    }
     JSONObject enriched = new JSONObject();
     List<String> enrichmentTypes = value.getConfig()
                                         .getThreatIntel().getFieldToTypeMap()
                                         .get(EnrichmentUtils.toTopLevelField(value.getField()));
-    if(enrichmentTypes != null) {
+    if(isInitialized() && enrichmentTypes != null) {
       int i = 0;
       try {
         for (Boolean isThreat :
@@ -89,13 +92,18 @@ public class ThreatIntelAdapter implements EnrichmentAdapter<CacheKey>,Serializa
         }
       }
       catch(IOException e) {
+        _LOG.error("Unable to retrieve value: " + e.getMessage(), e);
+        initializeAdapter();
         throw new RuntimeException("Unable to retrieve value", e);
       }
     }
-    //throw new RuntimeException("Unable to retrieve value " + value);
     return enriched;
   }
 
+  public boolean isInitialized() {
+    return lookup != null && lookup.getTable() != null;
+  }
+
   @Override
   public boolean initializeAdapter() {
     PersistentAccessTracker accessTracker;
@@ -117,7 +125,8 @@ public class ThreatIntelAdapter implements EnrichmentAdapter<CacheKey>,Serializa
       );
       lookup = new EnrichmentLookup(config.getProvider().getTable(hbaseConfig, hbaseTable), config.getHBaseCF(), accessTracker);
     } catch (IOException e) {
-      throw new IllegalStateException("Unable to initialize ThreatIntelAdapter", e);
+      _LOG.error("Unable to initialize ThreatIntelAdapter", e);
+      return false;
     }
 
     return true;

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/681b61a2/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
index 234c795..3a4b67d 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
@@ -176,6 +176,7 @@ public class GenericEnrichmentBolt extends ConfiguredEnrichmentBolt {
       else {
         throw new RuntimeException("Source type is missing from enrichment fragment: " + rawMessage.toJSONString());
       }
+      boolean error = false;
       for (Object o : rawMessage.keySet()) {
         String field = (String) o;
         String value = (String) rawMessage.get(field);
@@ -186,14 +187,23 @@ public class GenericEnrichmentBolt extends ConfiguredEnrichmentBolt {
           if (value != null && value.length() != 0) {
             SensorEnrichmentConfig config = configurations.getSensorEnrichmentConfig(sourceType);
             if(config == null) {
-              throw new RuntimeException("Unable to find " + config);
+              LOG.error("Unable to find " + config);
+              error = true;
+              continue;
             }
             CacheKey cacheKey= new CacheKey(field, value, config);
-            adapter.logAccess(cacheKey);
-            enrichedField = cache.getUnchecked(cacheKey);
-            if (enrichedField == null)
-              throw new Exception("[Metron] Could not enrich string: "
-                      + value);
+            try {
+              adapter.logAccess(cacheKey);
+              enrichedField = cache.getUnchecked(cacheKey);
+              if (enrichedField == null)
+                throw new Exception("[Metron] Could not enrich string: "
+                        + value);
+            }
+            catch(Exception e) {
+              LOG.error(e.getMessage(), e);
+              error = true;
+              continue;
+            }
           }
           if (!enrichedField.isEmpty()) {
             for (Object enrichedKey : enrichedField.keySet()) {
@@ -206,7 +216,10 @@ public class GenericEnrichmentBolt extends ConfiguredEnrichmentBolt {
       }
 
       enrichedMessage.put("adapter." + adapter.getClass().getSimpleName().toLowerCase() + ".end.ts", "" + System.currentTimeMillis());
-      if (!enrichedMessage.isEmpty()) {
+      if(error) {
+        throw new Exception("Unable to enrich " + enrichedMessage + " check logs for specifics.");
+      }
+      if (enrichedMessage != null && !enrichedMessage.isEmpty()) {
         collector.emit(enrichmentType, new Values(key, enrichedMessage));
       }
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/681b61a2/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/geo/GeoAdapterTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/geo/GeoAdapterTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/geo/GeoAdapterTest.java
index ec90c49..ac5adde 100644
--- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/geo/GeoAdapterTest.java
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/geo/GeoAdapterTest.java
@@ -66,7 +66,7 @@ public class GeoAdapterTest {
     JSONParser jsonParser = new JSONParser();
     expectedMessage = (JSONObject) jsonParser.parse(expectedMessageString);
     MockitoAnnotations.initMocks(this);
-    when(statetment.executeQuery("select IPTOLOCID(\"CacheKey{field='dummy', value='72.163.4.161'}\") as ANS")).thenReturn(resultSet);
+    when(statetment.executeQuery("select IPTOLOCID(\"" + ip + "\") as ANS")).thenReturn(resultSet);
     when(statetment.executeQuery("select * from location where locID = 1")).thenReturn(resultSet1);
     when(resultSet.next()).thenReturn(Boolean.TRUE, Boolean.FALSE);
     when(resultSet.getString("ANS")).thenReturn("1");
@@ -83,7 +83,12 @@ public class GeoAdapterTest {
 
   @Test
   public void testEnrich() throws Exception {
-    GeoAdapter geo = new GeoAdapter();
+    GeoAdapter geo = new GeoAdapter() {
+      @Override
+      public boolean initializeAdapter() {
+        return true;
+      }
+    };
     geo.setStatement(statetment);
     JSONObject actualMessage = geo.enrich(new CacheKey("dummy", ip, null));
     Assert.assertNotNull(actualMessage.get("locID"));

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/681b61a2/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelAdapterTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelAdapterTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelAdapterTest.java
index 2afeb5b..f420b01 100644
--- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelAdapterTest.java
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelAdapterTest.java
@@ -122,7 +122,7 @@ public class ThreatIntelAdapterTest {
     Assert.assertEquals(expectedMessage, actualMessage);
   }
 
-  @Test(expected = IllegalStateException.class)
+  @Test
   public void testInitializeAdapter() {
 
     String cf = "cf";
@@ -145,7 +145,7 @@ public class ThreatIntelAdapterTest {
 
     ThreatIntelAdapter tia = new ThreatIntelAdapter(config);
     tia.initializeAdapter();
-
+    Assert.assertFalse(tia.isInitialized());
   }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/681b61a2/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBoltTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBoltTest.java
index a9a1637..31168ff 100644
--- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBoltTest.java
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBoltTest.java
@@ -18,6 +18,7 @@
 package org.apache.metron.enrichment.bolt;
 
 import backtype.storm.tuple.Values;
+import com.google.common.collect.ImmutableMap;
 import org.adrianwalker.multilinestring.Multiline;
 import org.apache.metron.TestConstants;
 import org.apache.metron.test.bolt.BaseEnrichmentBoltTest;
@@ -177,7 +178,7 @@ public class GenericEnrichmentBoltTest extends BaseEnrichmentBoltTest {
     when(tuple.getStringByField("key")).thenReturn(key);
     when(tuple.getValueByField("message")).thenReturn(originalMessage);
     genericEnrichmentBolt.execute(tuple);
-    verify(outputCollector, times(1)).emit(eq(enrichmentType), argThat(new EnrichedMessageMatcher(key, new JSONObject())));
+    verify(outputCollector, times(1)).emit(eq(enrichmentType), argThat(new EnrichedMessageMatcher(key, new JSONObject(ImmutableMap.of("source.type", "test")))));
     reset(enrichmentAdapter);
 
     SensorEnrichmentConfig sensorEnrichmentConfig = SensorEnrichmentConfig.