You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/01/26 15:18:12 UTC
[34/89] [abbrv] [partial] incubator-metron git commit: Rename all
OpenSOC files to Metron
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-Pcap_Service/README.txt
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Pcap_Service/README.txt b/metron-streaming/Metron-Pcap_Service/README.txt
new file mode 100644
index 0000000..8aba23e
--- /dev/null
+++ b/metron-streaming/Metron-Pcap_Service/README.txt
@@ -0,0 +1,16 @@
+'hbase' module of 'opensoc' project contains the code to communicate with HBase. This module has several APIs ( refer IPcapGetter.java, IPcapScanner.java files )
+to fetch pcaps from HBase. Following APIs have been created under this module implementation.
+
+APIs ( in IPcapGetter.java) to get pcaps using keys :
+ 1. public PcapsResponse getPcaps(List<String> keys, String lastRowKey, long startTime, long endTime, boolean includeReverseTraffic, boolean includeDuplicateLastRow, long maxResultSize) throws IOException;
+ 2. public PcapsResponse getPcaps(String key, long startTime, long endTime, boolean includeReverseTraffic) throws IOException;
+ 3. public PcapsResponse getPcaps(List<String> keys) throws IOException;
+ 4. public PcapsResponse getPcaps(String key) throws IOException;
+
+APIs ( in IPcapScanner.java) to get pcaps using key range :
+ 1. public byte[] getPcaps(String startKey, String endKey, long maxResponseSize, long startTime, long endTime) throws IOException;
+ 2. public byte[] getPcaps(String startKey, String endKey) throws IOException;
+
+
+Refer the wiki documentation for further details : https://hwcsco.atlassian.net/wiki/pages/viewpage.action?pageId=5242892
+
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-Pcap_Service/pom.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Pcap_Service/pom.xml b/metron-streaming/Metron-Pcap_Service/pom.xml
new file mode 100644
index 0000000..ecbce82
--- /dev/null
+++ b/metron-streaming/Metron-Pcap_Service/pom.xml
@@ -0,0 +1,267 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>com.opensoc</groupId>
+ <artifactId>OpenSOC-Streaming</artifactId>
+ <version>0.6BETA</version>
+ </parent>
+ <artifactId>OpenSOC-Pcap_Service</artifactId>
+ <description>OpenSOC Pcap_Service</description>
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <flume.version>1.4.0.2.0.6.0-76</flume.version>
+ <hadoop.version>2.2.0.2.0.6.0-76</hadoop.version>
+ <maven.compiler.target>1.7</maven.compiler.target>
+ <maven.compiler.source>1.7</maven.compiler.source>
+ <storm.version>0.9.2-incubating</storm.version>
+ <kafka.version>0.8.0</kafka.version>
+ <slf4j.version>1.7.5</slf4j.version>
+ <zookeeper.version>3.4.5.2.0.6.0-76</zookeeper.version>
+ <logger.version>1.2.15</logger.version>
+
+ <storm-kafka.version>0.9.2-incubating</storm-kafka.version>
+ <storm-hdfs.version>0.0.7-SNAPSHOT</storm-hdfs.version>
+ <storm-hbase.version>0.0.5-SNAPSHOT</storm-hbase.version>
+
+ <spring.integration.version>3.0.0.RELEASE</spring.integration.version>
+ <spring.version>3.2.6.RELEASE</spring.version>
+ <commons-fileupload.version>1.2.2</commons-fileupload.version>
+ <commons-io.version>2.4</commons-io.version>
+ <commons-configuration.version>1.10</commons-configuration.version>
+ <commons-lang.version>2.6</commons-lang.version>
+ <commons-collections.version>3.2.1</commons-collections.version>
+ <commons-beanutils.version>1.8.3</commons-beanutils.version>
+ <commons-jexl.version>2.1.1</commons-jexl.version>
+
+
+ <junit.version>4.11</junit.version>
+ <hamcrest.version>1.3</hamcrest.version>
+ <mockito.version>1.9.5</mockito.version>
+ <elastic-search.version>1.3.0</elastic-search.version>
+ </properties>
+ <dependencies>
+ <dependency>
+ <groupId>org.jboss.resteasy</groupId>
+ <artifactId>jaxrs-api</artifactId>
+ <version>3.0.4.Final</version>
+ </dependency>
+ <dependency>
+ <groupId>com.opensoc</groupId>
+ <artifactId>OpenSOC-Common</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>commons-beanutils</groupId>
+ <artifactId>commons-beanutils</artifactId>
+ <version>${commons-beanutils.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-jexl</artifactId>
+ <version>${commons-jexl.version}</version>
+ </dependency>
+
+ <dependency>
+ <artifactId>commons-configuration</artifactId>
+ <groupId>commons-configuration</groupId>
+ <version>${commons-configuration.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>${junit.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-api-mockito</artifactId>
+ <version>1.5</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-core</artifactId>
+ <version>1.5</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-module-junit4</artifactId>
+ <version>1.5</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>joda-time</groupId>
+ <artifactId>joda-time</artifactId>
+ <version>2.3</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-client</artifactId>
+ <version>${global_hbase_version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-testing-util</artifactId>
+ <version>${global_hbase_version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${global_hadoop_version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <version>${global_hadoop_version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.springframework.integration</groupId>
+ <artifactId>spring-integration-http</artifactId>
+ <version>${spring.integration.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-webmvc</artifactId>
+ <version>${spring.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <version>${logger.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.sun.jmx</groupId>
+ <artifactId>jmxri</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jdmk</groupId>
+ <artifactId>jmxtools</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.jms</groupId>
+ <artifactId>jms</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+
+
+
+
+ <dependency>
+ <groupId>org.jboss.resteasy</groupId>
+ <artifactId>resteasy-jaxrs</artifactId>
+ <version>3.0.1.Final</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-simple</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.resteasy</groupId>
+ <artifactId>resteasy-jaxb-provider</artifactId>
+ <version>3.0.1.Final</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.resteasy</groupId>
+ <artifactId>async-http-servlet-3.0</artifactId>
+ <version>3.0.1.Final</version>
+ <scope>compile</scope>
+ </dependency>
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-server</artifactId>
+ <version>9.3.0.M0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-servlet</artifactId>
+ <version>9.3.0.M0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-simple</artifactId>
+ <version>${global_slf4j_version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>${global_slf4j_version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>${global_slf4j_version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <configuration>
+ <archive>
+ <manifest>
+ <mainClass>com.opensoc.pcapservice.rest.PcapService</mainClass>
+ </manifest>
+ </archive>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id> <!-- this is used for inheritance merges -->
+ <phase>package</phase> <!-- bind to the packaging phase -->
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-Pcap_Service/pom.xml.versionsBackup
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Pcap_Service/pom.xml.versionsBackup b/metron-streaming/Metron-Pcap_Service/pom.xml.versionsBackup
new file mode 100644
index 0000000..a400fe2
--- /dev/null
+++ b/metron-streaming/Metron-Pcap_Service/pom.xml.versionsBackup
@@ -0,0 +1,268 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>com.opensoc</groupId>
+ <artifactId>OpenSOC-Streaming</artifactId>
+ <version>0.4BETA</version>
+ </parent>
+ <artifactId>OpenSOC-Pcap_Service</artifactId>
+ <description>OpenSOC Pcap_Service</description>
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <flume.version>1.4.0.2.0.6.0-76</flume.version>
+ <hadoop.version>2.2.0.2.0.6.0-76</hadoop.version>
+ <maven.compiler.source>${jdk.version}</maven.compiler.source>
+ <maven.compiler.target>${jdk.version}</maven.compiler.target>
+
+ <storm.version>0.9.2-incubating</storm.version>
+ <kafka.version>0.8.0</kafka.version>
+ <slf4j.version>1.7.5</slf4j.version>
+ <zookeeper.version>3.4.5.2.0.6.0-76</zookeeper.version>
+ <logger.version>1.2.15</logger.version>
+
+ <storm-kafka.version>0.9.2-incubating</storm-kafka.version>
+ <storm-hdfs.version>0.0.7-SNAPSHOT</storm-hdfs.version>
+ <storm-hbase.version>0.0.5-SNAPSHOT</storm-hbase.version>
+
+ <spring.integration.version>3.0.0.RELEASE</spring.integration.version>
+ <spring.version>3.2.6.RELEASE</spring.version>
+ <commons-fileupload.version>1.2.2</commons-fileupload.version>
+ <commons-io.version>2.4</commons-io.version>
+ <commons-configuration.version>1.10</commons-configuration.version>
+ <commons-lang.version>2.6</commons-lang.version>
+ <commons-collections.version>3.2.1</commons-collections.version>
+ <commons-beanutils.version>1.8.3</commons-beanutils.version>
+ <commons-jexl.version>2.1.1</commons-jexl.version>
+
+
+ <junit.version>4.11</junit.version>
+ <hamcrest.version>1.3</hamcrest.version>
+ <mockito.version>1.9.5</mockito.version>
+ <elastic-search.version>1.3.0</elastic-search.version>
+ </properties>
+ <dependencies>
+ <dependency>
+ <groupId>org.jboss.resteasy</groupId>
+ <artifactId>jaxrs-api</artifactId>
+ <version>3.0.4.Final</version>
+ </dependency>
+ <dependency>
+ <groupId>com.opensoc</groupId>
+ <artifactId>OpenSOC-Common</artifactId>
+ <version>${parent.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>commons-beanutils</groupId>
+ <artifactId>commons-beanutils</artifactId>
+ <version>${commons-beanutils.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-jexl</artifactId>
+ <version>${commons-jexl.version}</version>
+ </dependency>
+
+ <dependency>
+ <artifactId>commons-configuration</artifactId>
+ <groupId>commons-configuration</groupId>
+ <version>${commons-configuration.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>${junit.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-api-mockito</artifactId>
+ <version>1.5</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-core</artifactId>
+ <version>1.5</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-module-junit4</artifactId>
+ <version>1.5</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>joda-time</groupId>
+ <artifactId>joda-time</artifactId>
+ <version>2.3</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-client</artifactId>
+ <version>${global_hbase_version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-testing-util</artifactId>
+ <version>${global_hbase_version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${global_hadoop_version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <version>${global_hadoop_version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.springframework.integration</groupId>
+ <artifactId>spring-integration-http</artifactId>
+ <version>${spring.integration.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-webmvc</artifactId>
+ <version>${spring.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <version>${logger.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.sun.jmx</groupId>
+ <artifactId>jmxri</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jdmk</groupId>
+ <artifactId>jmxtools</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.jms</groupId>
+ <artifactId>jms</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+
+
+
+
+ <dependency>
+ <groupId>org.jboss.resteasy</groupId>
+ <artifactId>resteasy-jaxrs</artifactId>
+ <version>3.0.1.Final</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-simple</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.resteasy</groupId>
+ <artifactId>resteasy-jaxb-provider</artifactId>
+ <version>3.0.1.Final</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.resteasy</groupId>
+ <artifactId>async-http-servlet-3.0</artifactId>
+ <version>3.0.1.Final</version>
+ <scope>compile</scope>
+ </dependency>
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-server</artifactId>
+ <version>9.3.0.M0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-servlet</artifactId>
+ <version>9.3.0.M0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-simple</artifactId>
+ <version>${global_slf4j_version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>${global_slf4j_version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>${global_slf4j_version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <configuration>
+ <archive>
+ <manifest>
+ <mainClass>com.opensoc.pcapservice.rest.PcapService</mainClass>
+ </manifest>
+ </archive>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id> <!-- this is used for inheritance merges -->
+ <phase>package</phase> <!-- bind to the packaging phase -->
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-Pcap_Service/src/main/java/com/apache/metron/pcapservice/CellTimestampComparator.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Pcap_Service/src/main/java/com/apache/metron/pcapservice/CellTimestampComparator.java b/metron-streaming/Metron-Pcap_Service/src/main/java/com/apache/metron/pcapservice/CellTimestampComparator.java
new file mode 100644
index 0000000..e45d849
--- /dev/null
+++ b/metron-streaming/Metron-Pcap_Service/src/main/java/com/apache/metron/pcapservice/CellTimestampComparator.java
@@ -0,0 +1,23 @@
+package com.opensoc.pcapservice;
+
+import java.util.Comparator;
+
+import org.apache.hadoop.hbase.Cell;
+
+/**
+ * Comparator created for sorting pcaps cells based on the timestamp (asc).
+ *
+ * @author Sayi
+ */
+public class CellTimestampComparator implements Comparator<Cell> {
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.util.Comparator#compare(java.lang.Object, java.lang.Object)
+ */
+
+ public int compare(Cell o1, Cell o2) {
+ return Long.valueOf(o1.getTimestamp()).compareTo(o2.getTimestamp());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-Pcap_Service/src/main/java/com/apache/metron/pcapservice/ConfigurationUtil.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Pcap_Service/src/main/java/com/apache/metron/pcapservice/ConfigurationUtil.java b/metron-streaming/Metron-Pcap_Service/src/main/java/com/apache/metron/pcapservice/ConfigurationUtil.java
new file mode 100644
index 0000000..be1a1bf
--- /dev/null
+++ b/metron-streaming/Metron-Pcap_Service/src/main/java/com/apache/metron/pcapservice/ConfigurationUtil.java
@@ -0,0 +1,269 @@
+package com.opensoc.pcapservice;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.springframework.util.Assert;
+
+import com.opensoc.configuration.ConfigurationManager;
+
+
+
+/**
+ * utility class for this module which loads commons configuration to fetch
+ * properties from underlying resources to communicate with hbase.
+ *
+ * @author Sayi
+ */
+public class ConfigurationUtil {
+
+ /** Configuration definition file name for fetching pcaps from hbase */
+ private static final String configDefFileName = "config-definition-hbase.xml";
+
+ /** property configuration. */
+ private static Configuration propConfiguration = null;
+
+
+ /**
+ * The Enum SizeUnit.
+ */
+ public enum SizeUnit {
+
+ /** The kb. */
+ KB,
+ /** The mb. */
+ MB
+ };
+
+ /** The Constant DEFAULT_HCONNECTION_RETRY_LIMIT. */
+ private static final int DEFAULT_HCONNECTION_RETRY_LIMIT = 0;
+
+ /**
+ * Loads configuration resources
+ * @return Configuration
+ */
+ public static Configuration getConfiguration() {
+ if(propConfiguration == null){
+ propConfiguration = ConfigurationManager.getConfiguration(configDefFileName);
+ }
+ return propConfiguration;
+ }
+
+ /**
+ * Returns the configured default result size in bytes, if the user input is
+ * null; otherwise, returns the user input after validating with the
+ * configured max value. Throws IllegalArgumentException if : 1. input is
+ * less than or equals to 0 OR 2. input is greater than configured
+ * {hbase.scan.max.result.size} value
+ *
+ * @param input
+ * the input
+ * @return long
+ */
+ public static long validateMaxResultSize(String input) {
+ if (input == null) {
+ return getDefaultResultSize();
+ }
+ // validate the user input
+ long value = convertToBytes(Long.parseLong(input), getResultSizeUnit());
+ Assert.isTrue(
+ isAllowableResultSize(value),
+ "'maxResponseSize' param value must be positive and less than {hbase.scan.max.result.size} value");
+ return convertToBytes(value, getResultSizeUnit());
+ }
+
+ /**
+ * Checks if is allowable result size.
+ *
+ * @param input
+ * the input
+ * @return true, if is allowable result size
+ */
+ public static boolean isAllowableResultSize(long input) {
+ if (input <= 0 || input > getMaxResultSize()) {
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Returns the configured default result size in bytes.
+ *
+ * @return long
+ */
+ public static long getDefaultResultSize() {
+ float value = ConfigurationUtil.getConfiguration().getFloat(
+ "hbase.scan.default.result.size");
+ return convertToBytes(value, getResultSizeUnit());
+ }
+
+ /**
+ * Returns the configured max result size in bytes.
+ *
+ * @return long
+ */
+ public static long getMaxResultSize() {
+ float value = ConfigurationUtil.getConfiguration().getFloat(
+ "hbase.scan.max.result.size");
+ return convertToBytes(value, getResultSizeUnit());
+ }
+
+ /**
+ * Returns the configured max row size in bytes.
+ *
+ * @return long
+ */
+ public static long getMaxRowSize() {
+ float maxRowSize = ConfigurationUtil.getConfiguration().getFloat(
+ "hbase.table.max.row.size");
+ return convertToBytes(maxRowSize, getRowSizeUnit());
+ }
+
+ /**
+ * Gets the result size unit.
+ *
+ * @return the result size unit
+ */
+ public static SizeUnit getResultSizeUnit() {
+ return SizeUnit.valueOf(ConfigurationUtil.getConfiguration()
+ .getString("hbase.scan.result.size.unit"));
+ }
+
+ /**
+ * Gets the row size unit.
+ *
+ * @return the row size unit
+ */
+ public static SizeUnit getRowSizeUnit() {
+ return SizeUnit.valueOf(ConfigurationUtil.getConfiguration()
+ .getString("hbase.table.row.size.unit"));
+ }
+
+ /**
+ * Gets the connection retry limit.
+ *
+ * @return the connection retry limit
+ */
+ public static int getConnectionRetryLimit() {
+ return ConfigurationUtil.getConfiguration().getInt(
+ "hbase.hconnection.retries.number",
+ DEFAULT_HCONNECTION_RETRY_LIMIT);
+ }
+
+ /**
+ * Checks if is default include reverse traffic.
+ *
+ * @return true, if is default include reverse traffic
+ */
+ public static boolean isDefaultIncludeReverseTraffic() {
+ return ConfigurationUtil.getConfiguration().getBoolean(
+ "pcaps.include.reverse.traffic");
+ }
+
+ /**
+ * Gets the table name.
+ *
+ * @return the table name
+ */
+ public static byte[] getTableName() {
+ return Bytes.toBytes(ConfigurationUtil.getConfiguration().getString(
+ "hbase.table.name"));
+ }
+
+ /**
+ * Gets the column family.
+ *
+ * @return the column family
+ */
+ public static byte[] getColumnFamily() {
+ return Bytes.toBytes(ConfigurationUtil.getConfiguration().getString(
+ "hbase.table.column.family"));
+ }
+
+ /**
+ * Gets the column qualifier.
+ *
+ * @return the column qualifier
+ */
+ public static byte[] getColumnQualifier() {
+ return Bytes.toBytes(ConfigurationUtil.getConfiguration().getString(
+ "hbase.table.column.qualifier"));
+ }
+
+ /**
+ * Gets the max versions.
+ *
+ * @return the max versions
+ */
+ public static int getMaxVersions() {
+ return ConfigurationUtil.getConfiguration().getInt(
+ "hbase.table.column.maxVersions");
+ }
+
+ /**
+ * Gets the configured tokens in rowkey.
+ *
+ * @return the configured tokens in rowkey
+ */
+ public static int getConfiguredTokensInRowkey() {
+ return ConfigurationUtil.getConfiguration().getInt(
+ "hbase.table.row.key.tokens");
+ }
+
+ /**
+ * Gets the minimum tokens in inputkey.
+ *
+ * @return the minimum tokens in inputkey
+ */
+ public static int getMinimumTokensInInputkey() {
+ return ConfigurationUtil.getConfiguration().getInt(
+ "rest.api.input.key.min.tokens");
+ }
+
+ /**
+ * Gets the appending token digits.
+ *
+ * @return the appending token digits
+ */
+ public static int getAppendingTokenDigits() {
+ return ConfigurationUtil.getConfiguration().getInt(
+ "hbase.table.row.key.token.appending.digits");
+ }
+
+ /**
+ * Convert to bytes.
+ *
+ * @param value
+ * the value
+ * @param unit
+ * the unit
+ * @return the long
+ */
+ public static long convertToBytes(float value, SizeUnit unit) {
+ if (SizeUnit.KB == unit) {
+ return (long) (value * 1024);
+ }
+ if (SizeUnit.MB == unit) {
+ return (long) (value * 1024 * 1024);
+ }
+ return (long) value;
+ }
+
+ /**
+ * The main method.
+ *
+ * @param args
+ * the arguments
+ */
+ public static void main(String[] args) {
+ long r1 = getMaxRowSize();
+ System.out.println("getMaxRowSizeInBytes = " + r1);
+ long r2 = getMaxResultSize();
+ System.out.println("getMaxAllowableResultSizeInBytes = " + r2);
+
+ SizeUnit u1 = getRowSizeUnit();
+ System.out.println("getMaxRowSizeUnit = " + u1.toString());
+ SizeUnit u2 = getResultSizeUnit();
+ System.out.println("getMaxAllowableResultsSizeUnit = " + u2.toString());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-Pcap_Service/src/main/java/com/apache/metron/pcapservice/HBaseConfigConstants.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Pcap_Service/src/main/java/com/apache/metron/pcapservice/HBaseConfigConstants.java b/metron-streaming/Metron-Pcap_Service/src/main/java/com/apache/metron/pcapservice/HBaseConfigConstants.java
new file mode 100644
index 0000000..a7e7e3b
--- /dev/null
+++ b/metron-streaming/Metron-Pcap_Service/src/main/java/com/apache/metron/pcapservice/HBaseConfigConstants.java
@@ -0,0 +1,40 @@
+package com.opensoc.pcapservice;
+
+/**
+ * HBase configuration properties.
+ *
+ * @author Sayi
+ */
+public class HBaseConfigConstants {
+
+ /** The Constant HBASE_ZOOKEEPER_QUORUM. */
+ public static final String HBASE_ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum";
+
+ /** The Constant HBASE_ZOOKEEPER_CLIENT_PORT. */
+ public static final String HBASE_ZOOKEEPER_CLIENT_PORT = "hbase.zookeeper.clientPort";
+
+ /** The Constant HBASE_ZOOKEEPER_SESSION_TIMEOUT. */
+ public static final String HBASE_ZOOKEEPER_SESSION_TIMEOUT = "zookeeper.session.timeout";
+
+ /** The Constant HBASE_ZOOKEEPER_RECOVERY_RETRY. */
+ public static final String HBASE_ZOOKEEPER_RECOVERY_RETRY = "zookeeper.recovery.retry";
+
+ /** The Constant HBASE_CLIENT_RETRIES_NUMBER. */
+ public static final String HBASE_CLIENT_RETRIES_NUMBER = "hbase.client.retries.number";
+
+ /** The delimeter. */
+ String delimeter = "-";
+
+ /** The regex. */
+ String regex = "\\-";
+
+ /** The Constant PCAP_KEY_DELIMETER. */
+ public static final String PCAP_KEY_DELIMETER = "-";
+
+ /** The Constant START_KEY. */
+ public static final String START_KEY = "startKey";
+
+ /** The Constant END_KEY. */
+ public static final String END_KEY = "endKey";
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-Pcap_Service/src/main/java/com/apache/metron/pcapservice/HBaseConfigurationUtil.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Pcap_Service/src/main/java/com/apache/metron/pcapservice/HBaseConfigurationUtil.java b/metron-streaming/Metron-Pcap_Service/src/main/java/com/apache/metron/pcapservice/HBaseConfigurationUtil.java
new file mode 100644
index 0000000..8a5c022
--- /dev/null
+++ b/metron-streaming/Metron-Pcap_Service/src/main/java/com/apache/metron/pcapservice/HBaseConfigurationUtil.java
@@ -0,0 +1,165 @@
+/**
+ *
+ */
+package com.opensoc.pcapservice;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.log4j.Logger;
+import org.mortbay.log.Log;
+
+/**
+ * Utility class which creates HConnection instance when the first request is
+ * received and registers a shut down hook which closes the connection when the
+ * JVM exits. Creates new connection to the cluster only if the existing
+ * connection is closed for unknown reasons. Also creates Configuration with
+ * HBase resources using configuration properties.
+ *
+ * @author Sayi
+ *
+ */
+public class HBaseConfigurationUtil {
+
+ /** The Constant LOGGER. */
+ private static final Logger LOGGER = Logger
+ .getLogger(HBaseConfigurationUtil.class);
+
+ /** Configuration which holds all HBase properties. */
+ private static Configuration config;
+
+ /**
+ * A cluster connection which knows how to find master node and locate regions
+ * on the cluster.
+ */
+ private static HConnection clusterConnection = null;
+
+ /**
+ * Creates HConnection instance when the first request is received and returns
+ * the same instance for all subsequent requests if the connection is still
+ * open.
+ *
+ * @return HConnection instance
+ * @throws IOException
+ * Signals that an I/O exception has occurred.
+ */
+ public static HConnection getConnection() throws IOException {
+ if (!connectionAvailable()) {
+ synchronized (HBaseConfigurationUtil.class) {
+ createClusterConncetion();
+ }
+ }
+ return clusterConnection;
+ }
+
+ /**
+ * Creates the cluster conncetion.
+ *
+ * @throws IOException
+ * Signals that an I/O exception has occurred.
+ */
+ private static void createClusterConncetion() throws IOException {
+ try {
+ if (connectionAvailable()) {
+ return;
+ }
+ clusterConnection = HConnectionManager.createConnection(read());
+ addShutdownHook();
+ System.out.println("Created HConnection and added shutDownHook");
+ } catch (IOException e) {
+ LOGGER
+ .error(
+ "Exception occurred while creating HConnection using HConnectionManager",
+ e);
+ throw e;
+ }
+ }
+
+ /**
+ * Connection available.
+ *
+ * @return true, if successful
+ */
+ private static boolean connectionAvailable() {
+ if (clusterConnection == null) {
+ System.out.println("clusterConnection=" + clusterConnection);
+ return false;
+ }
+ System.out.println("clusterConnection.isClosed()="
+ + clusterConnection.isClosed());
+ return clusterConnection != null && !clusterConnection.isClosed();
+ }
+
+ /**
+ * Adds the shutdown hook.
+ */
+ private static void addShutdownHook() {
+ Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+ public void run() {
+ System.out
+ .println("Executing ShutdownHook HBaseConfigurationUtil : Closing HConnection");
+ try {
+ clusterConnection.close();
+ } catch (IOException e) {
+ Log.debug("Caught ignorable exception ", e);
+ }
+ }
+ }, "HBaseConfigurationUtilShutDown"));
+ }
+
+ /**
+ * Closes the underlying connection to cluster; ignores if any exception is
+ * thrown.
+ */
+ public static void closeConnection() {
+ if (clusterConnection != null) {
+ try {
+ clusterConnection.close();
+ } catch (IOException e) {
+ Log.debug("Caught ignorable exception ", e);
+ }
+ }
+ }
+
+ /**
+ * This method creates Configuration with HBase resources using configuration
+ * properties. The same Configuration object will be used to communicate with
+ * all HBase tables;
+ *
+ * @return Configuration object
+ */
+ public static Configuration read() {
+ if (config == null) {
+ synchronized (HBaseConfigurationUtil.class) {
+ if (config == null) {
+ config = HBaseConfiguration.create();
+
+ config.set(
+ HBaseConfigConstants.HBASE_ZOOKEEPER_QUORUM,
+ ConfigurationUtil.getConfiguration().getString(
+ "hbase.zookeeper.quorum"));
+ config.set(
+ HBaseConfigConstants.HBASE_ZOOKEEPER_CLIENT_PORT,
+ ConfigurationUtil.getConfiguration().getString(
+ "hbase.zookeeper.clientPort"));
+ config.set(
+ HBaseConfigConstants.HBASE_CLIENT_RETRIES_NUMBER,
+ ConfigurationUtil.getConfiguration().getString(
+ "hbase.client.retries.number"));
+ config.set(
+ HBaseConfigConstants.HBASE_ZOOKEEPER_SESSION_TIMEOUT,
+ ConfigurationUtil.getConfiguration().getString(
+ "zookeeper.session.timeout"));
+ config.set(
+ HBaseConfigConstants.HBASE_ZOOKEEPER_RECOVERY_RETRY,
+ ConfigurationUtil.getConfiguration().getString(
+ "zookeeper.recovery.retry"));
+ }
+ }
+ }
+ return config;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-Pcap_Service/src/main/java/com/apache/metron/pcapservice/IPcapGetter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Pcap_Service/src/main/java/com/apache/metron/pcapservice/IPcapGetter.java b/metron-streaming/Metron-Pcap_Service/src/main/java/com/apache/metron/pcapservice/IPcapGetter.java
new file mode 100644
index 0000000..dbff59c
--- /dev/null
+++ b/metron-streaming/Metron-Pcap_Service/src/main/java/com/apache/metron/pcapservice/IPcapGetter.java
@@ -0,0 +1,88 @@
+/**
+ *
+ */
+package com.opensoc.pcapservice;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * interface to all 'keys' based pcaps fetching methods.
+ *
+ * @author Sayi
+ */
+public interface IPcapGetter {
+
+ /**
+ * Gets the pcaps for the input list of keys and lastRowKey.
+ *
+ * @param keys
+ * the list of keys for which pcaps are to be retrieved
+ * @param lastRowKey
+ * last row key from the previous partial response
+ * @param startTime
+ * the start time in system milliseconds to be used to filter the
+ * pcaps. The value is set to '0' if the caller sends negative value
+ * @param endTime
+ * the end time in system milliseconds to be used to filter the
+ * pcaps. The value is set to Long.MAX_VALUE if the caller sends
+ * negative value. 'endTime' must be greater than the 'startTime'.
+ * @param includeReverseTraffic
+ * indicates whether or not to include pcaps from the reverse traffic
+ * @param includeDuplicateLastRow
+ * indicates whether or not to include the last row from the previous
+ * partial response
+ * @param maxResultSize
+ * the max result size
+ * @return PcapsResponse with all matching pcaps merged together
+ * @throws IOException
+ * Signals that an I/O exception has occurred.
+ */
+ public PcapsResponse getPcaps(List<String> keys, String lastRowKey,
+ long startTime, long endTime, boolean includeReverseTraffic,
+ boolean includeDuplicateLastRow, long maxResultSize) throws IOException;
+
+ /**
+ * Gets the pcaps for the input key.
+ *
+ * @param key
+ * the key for which pcaps is to be retrieved.
+ * @param startTime
+ * the start time in system milliseconds to be used to filter the
+ * pcaps. The value is set to '0' if the caller sends negative value
+ * @param endTime
+ * the end time in system milliseconds to be used to filter the
+ * pcaps.The value is set to Long.MAX_VALUE if the caller sends
+ * negative value. 'endTime' must be greater than the 'startTime'.
+ * @param includeReverseTraffic
+ * indicates whether or not to include pcaps from the reverse traffic
+ * @return PcapsResponse with all matching pcaps merged together
+ * @throws IOException
+ * Signals that an I/O exception has occurred.
+ */
+ public PcapsResponse getPcaps(String key, long startTime, long endTime,
+ boolean includeReverseTraffic) throws IOException;
+
+ /**
+ * Gets the pcaps for the input list of keys.
+ *
+ * @param keys
+ * the list of keys for which pcaps are to be retrieved.
+ * @return PcapsResponse with all matching pcaps merged together
+ * @throws IOException
+ * Signals that an I/O exception has occurred.
+ */
+ public PcapsResponse getPcaps(List<String> keys) throws IOException;
+
+ /**
+ * Gets the pcaps for the input key.
+ *
+ * @param key
+ * the key for which pcaps is to be retrieved.
+ * @return PcapsResponse with all matching pcaps merged together
+ * @throws IOException
+ * Signals that an I/O exception has occurred.
+ */
+ public PcapsResponse getPcaps(String key) throws IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-Pcap_Service/src/main/java/com/apache/metron/pcapservice/IPcapScanner.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Pcap_Service/src/main/java/com/apache/metron/pcapservice/IPcapScanner.java b/metron-streaming/Metron-Pcap_Service/src/main/java/com/apache/metron/pcapservice/IPcapScanner.java
new file mode 100644
index 0000000..64408e9
--- /dev/null
+++ b/metron-streaming/Metron-Pcap_Service/src/main/java/com/apache/metron/pcapservice/IPcapScanner.java
@@ -0,0 +1,49 @@
+package com.opensoc.pcapservice;
+
+import java.io.IOException;
+
+/**
+ * The Interface for all pcaps fetching methods based on key range.
+ */
+public interface IPcapScanner {
+
+ /**
+ * Gets the pcaps for between startKey (inclusive) and endKey (exclusive).
+ *
+ * @param startKey
+ * the start key of a key range for which pcaps is to be retrieved.
+ * @param endKey
+ * the end key of a key range for which pcaps is to be retrieved.
+ * @param maxResponseSize
+ * indicates the maximum response size in MegaBytes(MB). User needs
+ * to pass positive value and must be less than 60 (MB)
+ * @param startTime
+ * the start time in system milliseconds to be used to filter the
+ * pcaps. The value is set to '0' if the caller sends negative value
+ * @param endTime
+ * the end time in system milliseconds to be used to filter the
+ * pcaps. The value is set Long.MAX_VALUE if the caller sends
+ * negative value
+ * @return byte array with all matching pcaps merged together
+ * @throws IOException
+ * Signals that an I/O exception has occurred.
+ */
+ public byte[] getPcaps(String startKey, String endKey, long maxResponseSize,
+ long startTime, long endTime) throws IOException;
+
+ /**
+ * Gets the pcaps for between startKey (inclusive) and endKey (exclusive).
+ *
+ * @param startKey
+ * the start key (inclusive) of a key range for which pcaps is to be
+ * retrieved.
+ * @param endKey
+ * the end key (exclusive) of a key range for which pcaps is to be
+ * retrieved.
+ * @return byte array with all matching pcaps merged together
+ * @throws IOException
+ * Signals that an I/O exception has occurred.
+ */
+ public byte[] getPcaps(String startKey, String endKey) throws IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-Pcap_Service/src/main/java/com/apache/metron/pcapservice/PcapGetterHBaseImpl.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Pcap_Service/src/main/java/com/apache/metron/pcapservice/PcapGetterHBaseImpl.java b/metron-streaming/Metron-Pcap_Service/src/main/java/com/apache/metron/pcapservice/PcapGetterHBaseImpl.java
new file mode 100644
index 0000000..b06137d
--- /dev/null
+++ b/metron-streaming/Metron-Pcap_Service/src/main/java/com/apache/metron/pcapservice/PcapGetterHBaseImpl.java
@@ -0,0 +1,809 @@
+package com.opensoc.pcapservice;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.annotation.Resource;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Response;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.NoServerForRegionException;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.log4j.Logger;
+import org.springframework.util.Assert;
+import org.springframework.util.CollectionUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Singleton class which integrates with HBase table and returns pcaps sorted by
+ * timestamp(dsc) for the given list of keys. Creates HConnection if it is not
+ * already created and the same connection instance is being used for all
+ * requests
+ *
+ * @author sheetal
+ * @version $Revision: 1.0 $
+ */
+
+@Path("/")
+public class PcapGetterHBaseImpl implements IPcapGetter {
+
+ /** The pcap getter h base. */
+ private static IPcapGetter pcapGetterHBase = null;
+
+ /** The Constant LOG. */
+ private static final Logger LOGGER = Logger
+ .getLogger(PcapGetterHBaseImpl.class);
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see com.cisco.opensoc.hbase.client.IPcapGetter#getPcaps(java.util.List,
+ * java.lang.String, long, long, boolean, boolean, long)
+ */
+
+
+ @GET
+ @Path("pcap/test")
+ @Produces("text/html")
+ public Response index() throws URISyntaxException {
+ return Response.ok("ALL GOOD").build();
+ }
+
+
+ public PcapsResponse getPcaps(List<String> keys, String lastRowKey,
+ long startTime, long endTime, boolean includeReverseTraffic,
+ boolean includeDuplicateLastRow, long maxResultSize) throws IOException {
+ Assert
+ .isTrue(
+ checkIfValidInput(keys, lastRowKey),
+ "No valid input. One of the value must be present from {keys, lastRowKey}");
+ LOGGER.info(" keys=" + keys.toString() + "; lastRowKey="
+ + lastRowKey);
+
+ PcapsResponse pcapsResponse = new PcapsResponse();
+ // 1. Process partial response key
+ if (StringUtils.isNotEmpty(lastRowKey)) {
+ pcapsResponse = processKey(pcapsResponse, lastRowKey, startTime,
+ endTime, true, includeDuplicateLastRow, maxResultSize);
+ // LOGGER.debug("after scanning lastRowKey=" +
+ // pcapsResponse.toString()+"*********************************************************************");
+ if (pcapsResponse.getStatus() == PcapsResponse.Status.PARTIAL) {
+ return pcapsResponse;
+ }
+ }
+ // 2. Process input keys
+ List<String> sortedKeys = sortKeysByAscOrder(keys, includeReverseTraffic);
+ List<String> unprocessedKeys = new ArrayList<String>();
+ unprocessedKeys.addAll(sortedKeys);
+ if (StringUtils.isNotEmpty(lastRowKey)) {
+ unprocessedKeys.clear();
+ unprocessedKeys = getUnprocessedSublistOfKeys(sortedKeys,
+ lastRowKey);
+ }
+ LOGGER.info("unprocessedKeys in getPcaps" + unprocessedKeys.toString());
+ if (!CollectionUtils.isEmpty(unprocessedKeys)) {
+ for (int i = 0; i < unprocessedKeys.size(); i++) {
+ pcapsResponse = processKey(pcapsResponse, unprocessedKeys.get(i),
+ startTime, endTime, false, includeDuplicateLastRow, maxResultSize);
+ // LOGGER.debug("after scanning input unprocessedKeys.get(" + i + ") ="
+ // +
+ // pcapsResponse.toString()+"*********************************************************************");
+ if (pcapsResponse.getStatus() == PcapsResponse.Status.PARTIAL) {
+ return pcapsResponse;
+ }
+ }
+ }
+ return pcapsResponse;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see com.cisco.opensoc.hbase.client.IPcapGetter#getPcaps(java.lang.String, long,
+ * long, boolean)
+ */
+
+ public PcapsResponse getPcaps(String key, long startTime, long endTime,
+ boolean includeReverseTraffic) throws IOException {
+ Assert.hasText(key, "key must not be null or empty");
+ return getPcaps(Arrays.asList(key), null, startTime, endTime,
+ includeReverseTraffic, false, ConfigurationUtil.getDefaultResultSize());
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see com.cisco.opensoc.hbase.client.IPcapGetter#getPcaps(java.util.List)
+ */
+
+ public PcapsResponse getPcaps(List<String> keys) throws IOException {
+ Assert.notEmpty(keys, "'keys' must not be null or empty");
+ return getPcaps(keys, null, -1, -1,
+ ConfigurationUtil.isDefaultIncludeReverseTraffic(), false,
+ ConfigurationUtil.getDefaultResultSize());
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see com.cisco.opensoc.hbase.client.IPcapGetter#getPcaps(java.lang.String)
+ */
+
+ public PcapsResponse getPcaps(String key) throws IOException {
+ Assert.hasText(key, "key must not be null or empty");
+ return getPcaps(Arrays.asList(key), null, -1, -1,
+ ConfigurationUtil.isDefaultIncludeReverseTraffic(), false,
+ ConfigurationUtil.getDefaultResultSize());
+ }
+
+ /**
+ * Always returns the singleton instance.
+ *
+ * @return IPcapGetter singleton instance
+ * @throws IOException
+ * Signals that an I/O exception has occurred.
+ */
+ public static IPcapGetter getInstance() throws IOException {
+ if (pcapGetterHBase == null) {
+ synchronized (PcapGetterHBaseImpl.class) {
+ if (pcapGetterHBase == null) {
+ pcapGetterHBase = new PcapGetterHBaseImpl();
+ }
+ }
+ }
+ return pcapGetterHBase;
+ }
+
+ /**
+ * Instantiates a new pcap getter h base impl.
+ */
+ private PcapGetterHBaseImpl() {
+ }
+
+ /**
+ * Adds reverse keys to the list if the flag 'includeReverseTraffic' is set to
+ * true; removes duplicates and sorts the list by ascending order;.
+ *
+ * @param keys
+ * input keys
+ * @param includeReverseTraffic
+ * flag whether or not to include reverse traffic
+ * @return List<String>
+ */
+ @VisibleForTesting
+ List<String> sortKeysByAscOrder(List<String> keys,
+ boolean includeReverseTraffic) {
+ Assert.notEmpty(keys, "'keys' must not be null");
+ if (includeReverseTraffic) {
+ keys.addAll(PcapHelper.reverseKey(keys));
+ }
+ List<String> deDupKeys = removeDuplicateKeys(keys);
+ Collections.sort(deDupKeys);
+ return deDupKeys;
+ }
+
+ /**
+ * Removes the duplicate keys.
+ *
+ * @param keys
+ * the keys
+ * @return the list
+ */
+ @VisibleForTesting
+public
+ List<String> removeDuplicateKeys(List<String> keys) {
+ Set<String> set = new HashSet<String>(keys);
+ return new ArrayList<String>(set);
+ }
+
+ /**
+ * <p>
+ * Returns the sublist starting from the element after the lastRowKey
+ * to the last element in the list; if the 'lastRowKey' is not matched
+ * the complete list will be returned.
+ * </p>
+ *
+ * <pre>
+ * Eg :
+ * keys = [18800006-1800000b-06-0019-caac, 18800006-1800000b-06-0050-5af6, 18800006-1800000b-11-0035-3810]
+ * lastRowKey = "18800006-1800000b-06-0019-caac-65140-40815"
+ * and the response from this method [18800006-1800000b-06-0050-5af6, 18800006-1800000b-11-0035-3810]
+ * </pre>
+ *
+ * @param keys
+ * keys
+ * @param lastRowKey
+ * last row key of the previous partial response
+ * @return List<String>
+ */
+ @VisibleForTesting
+ List<String> getUnprocessedSublistOfKeys(List<String> keys,
+ String lastRowKey) {
+ Assert.notEmpty(keys, "'keys' must not be null");
+ Assert.hasText(lastRowKey, "'lastRowKey' must not be null");
+ String partialKey = getTokens(lastRowKey, 5);
+ int startIndex = 0;
+ for (int i = 0; i < keys.size(); i++) {
+ if (partialKey.equals(keys.get(i))) {
+ startIndex = i + 1;
+ break;
+ }
+ }
+ List<String> unprocessedKeys = keys.subList(startIndex, keys.size());
+ return unprocessedKeys;
+ }
+
+ /**
+ * Returns the first 'noOfTokens' tokens from the given key; token delimiter
+ * "-";.
+ *
+ * @param key
+ * given key
+ * @param noOfTokens
+ * number of tokens to retrieve
+ * @return the tokens
+ */
+ @VisibleForTesting
+ String getTokens(String key, int noOfTokens) {
+ String delimeter = HBaseConfigConstants.PCAP_KEY_DELIMETER;
+ String regex = "\\" + delimeter;
+ String[] keyTokens = key.split(regex);
+ Assert.isTrue(noOfTokens < keyTokens.length,
+ "Invalid value for 'noOfTokens'");
+ StringBuffer sbf = new StringBuffer();
+ for (int i = 0; i < noOfTokens; i++) {
+ sbf.append(keyTokens[i]);
+ if (i != (noOfTokens - 1)) {
+ sbf.append(HBaseConfigConstants.PCAP_KEY_DELIMETER);
+ }
+
+ }
+ return sbf.toString();
+ }
+
+ /**
+ * Process key.
+ *
+ * @param pcapsResponse
+ * the pcaps response
+ * @param key
+ * the key
+ * @param startTime
+ * the start time
+ * @param endTime
+ * the end time
+ * @param isPartialResponse
+ * the is partial response
+ * @param includeDuplicateLastRow
+ * the include duplicate last row
+ * @param maxResultSize
+ * the max result size
+ * @return the pcaps response
+ * @throws IOException
+ * Signals that an I/O exception has occurred.
+ */
+ @VisibleForTesting
+ PcapsResponse processKey(PcapsResponse pcapsResponse, String key,
+ long startTime, long endTime, boolean isPartialResponse,
+ boolean includeDuplicateLastRow, long maxResultSize) throws IOException {
+ HTable table = null;
+ Scan scan = null;
+ List<Cell> scannedCells = null;
+ try {
+ // 1. Create start and stop row for the key;
+ Map<String, String> keysMap = createStartAndStopRowKeys(key,
+ isPartialResponse, includeDuplicateLastRow);
+
+ // 2. if the input key contains all fragments (7) and it is not part
+ // of previous partial response (isPartialResponse),
+ // 'keysMap' will be null; do a Get; currently not doing any
+ // response size related checks for Get;
+ // by default all cells from a specific row are sorted by timestamp
+ if (keysMap == null) {
+ Get get = createGetRequest(key, startTime, endTime);
+ List<Cell> cells = executeGetRequest(table, get);
+ for (Cell cell : cells) {
+ pcapsResponse.addPcaps(CellUtil.cloneValue(cell));
+ }
+ return pcapsResponse;
+ }
+ // 3. Create and execute Scan request
+ scan = createScanRequest(pcapsResponse, keysMap, startTime, endTime,
+ maxResultSize);
+ scannedCells = executeScanRequest(table, scan);
+ LOGGER.info("scannedCells size :" + scannedCells.size());
+ addToResponse(pcapsResponse, scannedCells, maxResultSize);
+
+ } catch (IOException e) {
+ LOGGER.error("Exception occurred while fetching Pcaps for the keys :"
+ + key, e);
+ if (e instanceof ZooKeeperConnectionException
+ || e instanceof MasterNotRunningException
+ || e instanceof NoServerForRegionException) {
+ int maxRetryLimit = ConfigurationUtil.getConnectionRetryLimit();
+ System.out.println("maxRetryLimit =" + maxRetryLimit);
+ for (int attempt = 1; attempt <= maxRetryLimit; attempt++) {
+ System.out.println("attempting =" + attempt);
+ try {
+ HBaseConfigurationUtil.closeConnection(); // closing the
+ // existing
+ // connection
+ // and retry,
+ // it will
+ // create a new
+ // HConnection
+ scannedCells = executeScanRequest(table, scan);
+ addToResponse(pcapsResponse, scannedCells, maxResultSize);
+ break;
+ } catch (IOException ie) {
+ if (attempt == maxRetryLimit) {
+ LOGGER.error("Throwing the exception after retrying "
+ + maxRetryLimit + " times.");
+ throw e;
+ }
+ }
+ }
+ }
+
+ } finally {
+ if (table != null) {
+ table.close();
+ }
+ }
+ return pcapsResponse;
+ }
+
+ /**
+ * Adds the to response.
+ *
+ * @param pcapsResponse
+ * the pcaps response
+ * @param scannedCells
+ * the scanned cells
+ * @param maxResultSize
+ * the max result size
+ */
+ private void addToResponse(PcapsResponse pcapsResponse,
+ List<Cell> scannedCells, long maxResultSize) {
+ String lastKeyFromCurrentScan = null;
+ if (scannedCells != null && scannedCells.size() > 0) {
+ lastKeyFromCurrentScan = new String(CellUtil.cloneRow(scannedCells
+ .get(scannedCells.size() - 1)));
+ }
+ // 4. calculate the response size
+ Collections.sort(scannedCells, PcapHelper.getCellTimestampComparator());
+ for (Cell sortedCell : scannedCells) {
+ pcapsResponse.addPcaps(CellUtil.cloneValue(sortedCell));
+ }
+ if (!pcapsResponse.isResonseSizeWithinLimit(maxResultSize)) {
+ pcapsResponse.setStatus(PcapsResponse.Status.PARTIAL); // response size
+ // reached
+ pcapsResponse.setLastRowKey(new String(lastKeyFromCurrentScan));
+ }
+ }
+
+ /**
+ * Builds start and stop row keys according to the following logic : 1.
+ * Creates tokens out of 'key' using pcap_id delimiter ('-') 2. if the input
+ * 'key' contains (assume : configuredTokensInRowKey=7 and
+ * minimumTokensIninputKey=5): a). 5 tokens
+ * ("srcIp-dstIp-protocol-srcPort-dstPort") startKey =
+ * "srcIp-dstIp-protocol-srcPort-dstPort-00000-00000" stopKey =
+ * "srcIp-dstIp-protocol-srcPort-dstPort-99999-99999" b). 6 tokens
+ * ("srcIp-dstIp-protocol-srcPort-dstPort-id1") startKey =
+ * "srcIp-dstIp-protocol-srcPort-dstPort-id1-00000" stopKey =
+ * "srcIp-dstIp-protocol-srcPort-dstPort-id1-99999"
+ *
+ * c). 7 tokens ("srcIp-dstIp-protocol-srcPort-dstPort-id1-id2") 1>. if the
+ * key is NOT part of the partial response from previous request, return
+ * 'null' 2>. if the key is part of partial response from previous request
+ * startKey = "srcIp-dstIp-protocol-srcPort-dstPort-id1-(id2+1)"; 1 is added
+ * to exclude this key as it was included in the previous request stopKey =
+ * "srcIp-dstIp-protocol-srcPort-dstPort-99999-99999"
+ *
+ * @param key
+ * the key
+ * @param isLastRowKey
+ * if the key is part of partial response
+ * @param includeDuplicateLastRow
+ * the include duplicate last row
+ * @return Map<String, String>
+ */
+ @VisibleForTesting
+ Map<String, String> createStartAndStopRowKeys(String key,
+ boolean isLastRowKey, boolean includeDuplicateLastRow) {
+ String delimeter = HBaseConfigConstants.PCAP_KEY_DELIMETER;
+ String regex = "\\" + delimeter;
+ String[] keyTokens = key.split(regex);
+
+ String startKey = null;
+ String endKey = null;
+ Map<String, String> map = new HashMap<String, String>();
+
+ int configuredTokensInRowKey = ConfigurationUtil
+ .getConfiguredTokensInRowkey();
+ int minimumTokensIninputKey = ConfigurationUtil
+ .getMinimumTokensInInputkey();
+ Assert
+ .isTrue(
+ minimumTokensIninputKey <= configuredTokensInRowKey,
+ "tokens in the input key (separated by '-'), must be less than or equal to the tokens used in hbase table row key ");
+ // in case if the input key contains 'configuredTokensInRowKey' tokens and
+ // it is NOT a
+ // partial response key, do a Get instead of Scan
+ if (keyTokens.length == configuredTokensInRowKey) {
+ if (!isLastRowKey) {
+ return null;
+ }
+ // it is a partial response key; 'startKey' is same as input partial
+ // response key; 'endKey' can be built by replacing
+ // (configuredTokensInRowKey - minimumTokensIninputKey) tokens
+ // of input partial response key with '99999'
+ if (keyTokens.length == minimumTokensIninputKey) {
+ return null;
+ }
+ int appendingTokenSlots = configuredTokensInRowKey
+ - minimumTokensIninputKey;
+ if (appendingTokenSlots > 0) {
+ String partialKey = getTokens(key, minimumTokensIninputKey);
+ StringBuffer sbfStartNew = new StringBuffer(partialKey);
+ StringBuffer sbfEndNew = new StringBuffer(partialKey);
+ for (int i = 0; i < appendingTokenSlots; i++) {
+ if (i == (appendingTokenSlots - 1)) {
+ if (!includeDuplicateLastRow) {
+ sbfStartNew
+ .append(HBaseConfigConstants.PCAP_KEY_DELIMETER)
+ .append(
+ Integer.valueOf(keyTokens[minimumTokensIninputKey + i]) + 1);
+ } else {
+ sbfStartNew.append(HBaseConfigConstants.PCAP_KEY_DELIMETER)
+ .append(keyTokens[minimumTokensIninputKey + i]);
+ }
+ } else {
+ sbfStartNew.append(HBaseConfigConstants.PCAP_KEY_DELIMETER).append(
+ keyTokens[minimumTokensIninputKey + i]);
+ }
+ sbfEndNew.append(HBaseConfigConstants.PCAP_KEY_DELIMETER).append(
+ getMaxLimitForAppendingTokens());
+ }
+ startKey = sbfStartNew.toString();
+ endKey = sbfEndNew.toString();
+ }
+ } else {
+ StringBuffer sbfStart = new StringBuffer(key);
+ StringBuffer sbfEnd = new StringBuffer(key);
+ for (int i = keyTokens.length; i < configuredTokensInRowKey; i++) {
+ sbfStart.append(HBaseConfigConstants.PCAP_KEY_DELIMETER).append(
+ getMinLimitForAppendingTokens());
+ sbfEnd.append(HBaseConfigConstants.PCAP_KEY_DELIMETER).append(
+ getMaxLimitForAppendingTokens());
+ }
+ startKey = sbfStart.toString();
+ endKey = sbfEnd.toString();
+ }
+ map.put(HBaseConfigConstants.START_KEY, startKey);
+ map.put(HBaseConfigConstants.END_KEY, endKey);
+
+ return map;
+ }
+
+ /**
+ * Returns false if keys is empty or null AND lastRowKey is null or
+ * empty; otherwise returns true;.
+ *
+ * @param keys
+ * input row keys
+ * @param lastRowKey
+ * partial response key
+ * @return boolean
+ */
+ @VisibleForTesting
+ boolean checkIfValidInput(List<String> keys, String lastRowKey) {
+ if (CollectionUtils.isEmpty(keys)
+ && StringUtils.isEmpty(lastRowKey)) {
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Executes the given Get request.
+ *
+ * @param table
+ * hbase table
+ * @param get
+ * Get
+ * @return List<Cell>
+ * @throws IOException
+ * Signals that an I/O exception has occurred.
+ */
+ private List<Cell> executeGetRequest(HTable table, Get get)
+ throws IOException {
+ LOGGER.info("Get :" + get.toString());
+ table = (HTable) HBaseConfigurationUtil.getConnection().getTable(
+ ConfigurationUtil.getTableName());
+ Result result = table.get(get);
+ List<Cell> cells = result.getColumnCells(
+ ConfigurationUtil.getColumnFamily(),
+ ConfigurationUtil.getColumnQualifier());
+ return cells;
+ }
+
+ /**
+ * Execute scan request.
+ *
+ * @param table
+ * hbase table
+ * @param scan
+ * the scan
+ * @return the list
+ * @throws IOException
+ * Signals that an I/O exception has occurred.
+ */
+ private List<Cell> executeScanRequest(HTable table, Scan scan)
+ throws IOException {
+ LOGGER.info("Scan :" + scan.toString());
+ table = (HTable) HBaseConfigurationUtil.getConnection().getTable(
+ ConfigurationUtil.getConfiguration().getString("hbase.table.name"));
+ ResultScanner resultScanner = table.getScanner(scan);
+ List<Cell> scannedCells = new ArrayList<Cell>();
+ for (Result result = resultScanner.next(); result != null; result = resultScanner
+ .next()) {
+ List<Cell> cells = result.getColumnCells(
+ ConfigurationUtil.getColumnFamily(),
+ ConfigurationUtil.getColumnQualifier());
+ if (cells != null) {
+ for (Cell cell : cells) {
+ scannedCells.add(cell);
+ }
+ }
+ }
+ return scannedCells;
+ }
+
+ /**
+ * Creates the get request.
+ *
+ * @param key
+ * the key
+ * @param startTime
+ * the start time
+ * @param endTime
+ * the end time
+ * @return the gets the
+ * @throws IOException
+ * Signals that an I/O exception has occurred.
+ */
+ @VisibleForTesting
+ Get createGetRequest(String key, long startTime, long endTime)
+ throws IOException {
+ Get get = new Get(Bytes.toBytes(key));
+ // set family name
+ get.addFamily(ConfigurationUtil.getColumnFamily());
+
+ // set column family, qualifier
+ get.addColumn(ConfigurationUtil.getColumnFamily(),
+ ConfigurationUtil.getColumnQualifier());
+
+ // set max versions
+ get.setMaxVersions(ConfigurationUtil.getMaxVersions());
+
+ // set time range
+ setTimeRangeOnGet(get, startTime, endTime);
+ return get;
+ }
+
+ /**
+ * Creates the scan request.
+ *
+ * @param pcapsResponse
+ * the pcaps response
+ * @param keysMap
+ * the keys map
+ * @param startTime
+ * the start time
+ * @param endTime
+ * the end time
+ * @param maxResultSize
+ * the max result size
+ * @return the scan
+ * @throws IOException
+ * Signals that an I/O exception has occurred.
+ */
+ @VisibleForTesting
+ Scan createScanRequest(PcapsResponse pcapsResponse,
+ Map<String, String> keysMap, long startTime, long endTime,
+ long maxResultSize) throws IOException {
+ Scan scan = new Scan();
+ // set column family, qualifier
+ scan.addColumn(ConfigurationUtil.getColumnFamily(),
+ ConfigurationUtil.getColumnQualifier());
+
+ // set start and stop keys
+ scan.setStartRow(keysMap.get(HBaseConfigConstants.START_KEY).getBytes());
+ scan.setStopRow(keysMap.get(HBaseConfigConstants.END_KEY).getBytes());
+
+ // set max results size : remaining size = max results size - ( current
+ // pcaps response size + possible maximum row size)
+ long remainingSize = maxResultSize
+ - (pcapsResponse.getResponseSize() + ConfigurationUtil.getMaxRowSize());
+
+ if (remainingSize > 0) {
+ scan.setMaxResultSize(remainingSize);
+ }
+ // set max versions
+ scan.setMaxVersions(ConfigurationUtil.getConfiguration().getInt(
+ "hbase.table.column.maxVersions"));
+
+ // set time range
+ setTimeRangeOnScan(scan, startTime, endTime);
+ return scan;
+ }
+
+ /**
+ * Sets the time range on scan.
+ *
+ * @param scan
+ * the scan
+ * @param startTime
+ * the start time
+ * @param endTime
+ * the end time
+ * @throws IOException
+ * Signals that an I/O exception has occurred.
+ */
+ private void setTimeRangeOnScan(Scan scan, long startTime, long endTime)
+ throws IOException {
+ boolean setTimeRange = true;
+ if (startTime < 0 && endTime < 0) {
+ setTimeRange = false;
+ }
+ if (setTimeRange) {
+ if (startTime < 0) {
+ startTime = 0;
+ } else {
+ startTime = PcapHelper.convertToDataCreationTimeUnit(startTime);
+ }
+ if (endTime < 0) {
+ endTime = Long.MAX_VALUE;
+ } else {
+ endTime = PcapHelper.convertToDataCreationTimeUnit(endTime);
+ }
+ Assert.isTrue(startTime < endTime,
+ "startTime value must be less than endTime value");
+ scan.setTimeRange(startTime, endTime);
+ }
+ }
+
+ /**
+ * Sets the time range on get.
+ *
+ * @param get
+ * the get
+ * @param startTime
+ * the start time
+ * @param endTime
+ * the end time
+ * @throws IOException
+ * Signals that an I/O exception has occurred.
+ */
+ private void setTimeRangeOnGet(Get get, long startTime, long endTime)
+ throws IOException {
+ boolean setTimeRange = true;
+ if (startTime < 0 && endTime < 0) {
+ setTimeRange = false;
+ }
+ if (setTimeRange) {
+ if (startTime < 0) {
+ startTime = 0;
+ } else {
+ startTime = PcapHelper.convertToDataCreationTimeUnit(startTime);
+ }
+ if (endTime < 0) {
+ endTime = Long.MAX_VALUE;
+ } else {
+ endTime = PcapHelper.convertToDataCreationTimeUnit(endTime);
+ }
+ Assert.isTrue(startTime < endTime,
+ "startTime value must be less than endTime value");
+ get.setTimeRange(startTime, endTime);
+ }
+ }
+
+ /**
+ * Gets the min limit for appending tokens.
+ *
+ * @return the min limit for appending tokens
+ */
+ private String getMinLimitForAppendingTokens() {
+ int digits = ConfigurationUtil.getAppendingTokenDigits();
+ StringBuffer sbf = new StringBuffer();
+ for (int i = 0; i < digits; i++) {
+ sbf.append("0");
+ }
+ return sbf.toString();
+ }
+
+ /**
+ * Gets the max limit for appending tokens.
+ *
+ * @return the max limit for appending tokens
+ */
+ private String getMaxLimitForAppendingTokens() {
+ int digits = ConfigurationUtil.getAppendingTokenDigits();
+ StringBuffer sbf = new StringBuffer();
+ for (int i = 0; i < digits; i++) {
+ sbf.append("9");
+ }
+ return sbf.toString();
+ }
+
+ /**
+ * The main method.
+ *
+ * @param args
+ * the arguments
+ *
+ * @throws IOException
+ * Signals that an I/O exception has occurred.
+ */
+ public static void main(String[] args) throws IOException {
+ if (args == null || args.length < 2) {
+ usage();
+ return;
+ }
+ String outputFileName = null;
+ outputFileName = args[1];
+ List<String> keys = Arrays.asList(StringUtils.split(args[2], ","));
+ System.out.println("Geting keys " + keys);
+ long startTime = 0;
+ long endTime = Long.MAX_VALUE;
+ if (args.length > 3) {
+ startTime = Long.valueOf(args[3]);
+ }
+ if (args.length > 4) {
+ endTime = Long.valueOf(args[4]);
+ }
+ System.out.println("With start time " + startTime + " and end time "
+ + endTime);
+ PcapGetterHBaseImpl downloader = new PcapGetterHBaseImpl();
+ PcapsResponse pcaps = downloader.getPcaps(keys, null, startTime, endTime,
+ false, false, 6);
+ File file = new File(outputFileName);
+ FileUtils.write(file, "", false);
+ FileUtils.writeByteArrayToFile(file, pcaps.getPcaps(), true);
+ }
+
+ /**
+ * Usage.
+ */
+ private static void usage() {
+ System.out.println("java " + PcapGetterHBaseImpl.class.getName() // $codepro.audit.disable
+ // debuggingCode
+ + " <zk quorum> <output file> <start key> [stop key]");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-Pcap_Service/src/main/java/com/apache/metron/pcapservice/PcapHelper.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Pcap_Service/src/main/java/com/apache/metron/pcapservice/PcapHelper.java b/metron-streaming/Metron-Pcap_Service/src/main/java/com/apache/metron/pcapservice/PcapHelper.java
new file mode 100644
index 0000000..5224945
--- /dev/null
+++ b/metron-streaming/Metron-Pcap_Service/src/main/java/com/apache/metron/pcapservice/PcapHelper.java
@@ -0,0 +1,205 @@
+package com.opensoc.pcapservice;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.log4j.Logger;
+import org.mortbay.log.Log;
+import org.springframework.util.Assert;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * utility class which holds methods related to time conversions, building
+ * reverse keys.
+ */
+public class PcapHelper {
+
+ /** The Constant LOGGER. */
+ private static final Logger LOGGER = Logger.getLogger(PcapHelper.class);
+
+ /** The cell timestamp comparator. */
+ private static CellTimestampComparator CELL_TIMESTAMP_COMPARATOR = new CellTimestampComparator();
+
+ /**
+ * The Enum TimeUnit.
+ */
+ public enum TimeUnit {
+
+ /** The seconds. */
+ SECONDS,
+ /** The millis. */
+ MILLIS,
+ /** The micros. */
+ MICROS,
+ /** The unknown. */
+ UNKNOWN
+ };
+
+ /**
+ * Converts the given time to the 'hbase' data creation time unit.
+ *
+ * @param inputTime
+ * the input time
+ * @return the long
+ */
+ public static long convertToDataCreationTimeUnit(long inputTime) {
+ if (inputTime <= 9999999999L) {
+ return convertSecondsToDataCreationTimeUnit(inputTime); // input time unit
+ // is in seconds
+ } else if (inputTime <= 9999999999999L) {
+ return convertMillisToDataCreationTimeUnit(inputTime); // input time unit
+ // is in millis
+ } else if (inputTime <= 9999999999999999L) {
+ return convertMicrosToDataCreationTimeUnit(inputTime); // input time unit
+ // it in micros
+ }
+ return inputTime; // input time unit is unknown
+ }
+
+ /**
+ * Returns the 'hbase' data creation time unit by reading
+ * 'hbase.table.data.time.unit' property in 'hbase-config' properties file; If
+ * none is mentioned in properties file, returns <code>TimeUnit.UNKNOWN</code>
+ *
+ * @return TimeUnit
+ */
+ @VisibleForTesting
+ public static TimeUnit getDataCreationTimeUnit() {
+ String timeUnit = ConfigurationUtil.getConfiguration().getString(
+ "hbase.table.data.time.unit");
+ LOGGER.debug("hbase.table.data.time.unit=" + timeUnit.toString());
+ if (StringUtils.isNotEmpty(timeUnit)) {
+ return TimeUnit.valueOf(timeUnit);
+ }
+ return TimeUnit.UNKNOWN;
+ }
+
+ /**
+ * Convert seconds to data creation time unit.
+ *
+ * @param inputTime
+ * the input time
+ * @return the long
+ */
+ @VisibleForTesting
+ public static long convertSecondsToDataCreationTimeUnit(long inputTime) {
+ System.out.println("convert Seconds To DataCreation TimeUnit");
+ TimeUnit dataCreationTimeUnit = getDataCreationTimeUnit();
+ if (TimeUnit.SECONDS == dataCreationTimeUnit) {
+ return inputTime;
+ } else if (TimeUnit.MILLIS == dataCreationTimeUnit) {
+ return inputTime * 1000;
+ } else if (TimeUnit.MICROS == dataCreationTimeUnit) {
+ return inputTime * 1000 * 1000;
+ }
+ return inputTime;
+ }
+
+ /**
+ * Builds the reverseKey to fetch the pcaps in the reverse traffic
+ * (destination to source).
+ *
+ * @param key
+ * indicates hbase rowKey (partial or full) in the format
+ * "srcAddr-dstAddr-protocol-srcPort-dstPort-fragment"
+ * @return String indicates the key in the format
+ * "dstAddr-srcAddr-protocol-dstPort-srcPort"
+ */
+ public static String reverseKey(String key) {
+ Assert.hasText(key, "key must not be null or empty");
+ String delimeter = HBaseConfigConstants.PCAP_KEY_DELIMETER;
+ String regex = "\\" + delimeter;
+ StringBuffer sb = new StringBuffer();
+ try {
+ String[] tokens = key.split(regex);
+ Assert
+ .isTrue(
+ (tokens.length == 5 || tokens.length == 6 || tokens.length == 7),
+ "key is not in the format : 'srcAddr-dstAddr-protocol-srcPort-dstPort-{ipId-fragment identifier}'");
+ sb.append(tokens[1]).append(delimeter).append(tokens[0])
+ .append(delimeter).append(tokens[2]).append(delimeter)
+ .append(tokens[4]).append(delimeter).append(tokens[3]);
+ } catch (Exception e) {
+ Log.warn("Failed to reverse the key. Reverse scan won't be performed.", e);
+ }
+ return sb.toString();
+ }
+
+ /**
+ * Builds the reverseKeys to fetch the pcaps in the reverse traffic
+ * (destination to source). If all keys in the input are not in the expected
+ * format, it returns an empty list;
+ *
+ * @param keys
+ * indicates list of hbase rowKeys (partial or full) in the format
+ * "srcAddr-dstAddr-protocol-srcPort-dstPort-fragment"
+ * @return List<String> indicates the list of keys in the format
+ * "dstAddr-srcAddr-protocol-dstPort-srcPort"
+ */
+ public static List<String> reverseKey(List<String> keys) {
+ Assert.notEmpty(keys, "'keys' must not be null or empty");
+ List<String> reverseKeys = new ArrayList<String>();
+ for (String key : keys) {
+ if (key != null) {
+ String reverseKey = reverseKey(key);
+ if (StringUtils.isNotEmpty(reverseKey)) {
+ reverseKeys.add(reverseKey);
+ }
+ }
+ }
+ return reverseKeys;
+ }
+
+ /**
+ * Returns Comparator for sorting pcaps cells based on the timestamp (dsc).
+ *
+ * @return CellTimestampComparator
+ */
+ public static CellTimestampComparator getCellTimestampComparator() {
+ return CELL_TIMESTAMP_COMPARATOR;
+ }
+
+ /**
+ * Convert millis to data creation time unit.
+ *
+ * @param inputTime
+ * the input time
+ * @return the long
+ */
+ @VisibleForTesting
+ private static long convertMillisToDataCreationTimeUnit(long inputTime) {
+ System.out.println("convert Millis To DataCreation TimeUnit");
+ TimeUnit dataCreationTimeUnit = getDataCreationTimeUnit();
+ if (TimeUnit.SECONDS == dataCreationTimeUnit) {
+ return (inputTime / 1000);
+ } else if (TimeUnit.MILLIS == dataCreationTimeUnit) {
+ return inputTime;
+ } else if (TimeUnit.MICROS == dataCreationTimeUnit) {
+ return inputTime * 1000;
+ }
+ return inputTime;
+ }
+
+ /**
+ * Convert micros to data creation time unit.
+ *
+ * @param inputTime
+ * the input time
+ * @return the long
+ */
+ @VisibleForTesting
+ private static long convertMicrosToDataCreationTimeUnit(long inputTime) {
+ System.out.println("convert Micros To DataCreation TimeUnit");
+ TimeUnit dataCreationTimeUnit = getDataCreationTimeUnit();
+ if (TimeUnit.SECONDS == dataCreationTimeUnit) {
+ return inputTime / (1000 * 1000);
+ } else if (TimeUnit.MILLIS == dataCreationTimeUnit) {
+ return inputTime / 1000;
+ } else if (TimeUnit.MICROS == dataCreationTimeUnit) {
+ return inputTime;
+ }
+ return inputTime;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-Pcap_Service/src/main/java/com/apache/metron/pcapservice/PcapReceiverImplRestEasy.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Pcap_Service/src/main/java/com/apache/metron/pcapservice/PcapReceiverImplRestEasy.java b/metron-streaming/Metron-Pcap_Service/src/main/java/com/apache/metron/pcapservice/PcapReceiverImplRestEasy.java
new file mode 100644
index 0000000..98e855e
--- /dev/null
+++ b/metron-streaming/Metron-Pcap_Service/src/main/java/com/apache/metron/pcapservice/PcapReceiverImplRestEasy.java
@@ -0,0 +1,250 @@
+package com.opensoc.pcapservice;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.log4j.Logger;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.opensoc.pcap.PcapUtils;
+
+@Path("/")
+public class PcapReceiverImplRestEasy {
+
+ /** The Constant LOGGER. */
+ private static final Logger LOGGER = Logger
+ .getLogger(PcapReceiverImplRestEasy.class);
+
+ /** The Constant HEADER_CONTENT_DISPOSITION_NAME. */
+ private static final String HEADER_CONTENT_DISPOSITION_NAME = "Content-Disposition";
+
+ /** The Constant HEADER_CONTENT_DISPOSITION_VALUE. */
+ private static final String HEADER_CONTENT_DISPOSITION_VALUE = "attachment; filename=\"managed-threat.pcap\"";
+
+ /** partial response key header name. */
+ private static final String HEADER_PARTIAL_RESPONE_KEY = "lastRowKey";
+
+ @GET
+ @Path("pcapGetter/getPcapsByKeys")
+ public Response getPcapsByKeys(
+ @QueryParam("keys") List<String> keys,
+ @QueryParam("lastRowKey") String lastRowKey,
+ @DefaultValue("-1") @QueryParam("startTime") long startTime,
+ @DefaultValue("-1") @QueryParam("endTime") long endTime,
+ @QueryParam("includeDuplicateLastRow") boolean includeDuplicateLastRow,
+ @QueryParam("includeReverseTraffic") boolean includeReverseTraffic,
+ @QueryParam("maxResponseSize") String maxResponseSize,
+ @Context HttpServletResponse response) throws IOException {
+ PcapsResponse pcapResponse = null;
+
+ if (keys == null || keys.size() == 0)
+ return Response.serverError().status(Response.Status.NO_CONTENT)
+ .entity("'keys' must not be null or empty").build();
+
+ try {
+ IPcapGetter pcapGetter = PcapGetterHBaseImpl.getInstance();
+ pcapResponse = pcapGetter.getPcaps(parseKeys(keys), lastRowKey,
+ startTime, endTime, includeReverseTraffic,
+ includeDuplicateLastRow,
+ ConfigurationUtil.validateMaxResultSize(maxResponseSize));
+ LOGGER.info("pcaps response in REST layer ="
+ + pcapResponse.toString());
+
+ // return http status '204 No Content' if the pcaps response size is
+ // 0
+ if (pcapResponse == null || pcapResponse.getResponseSize() == 0) {
+
+ return Response.status(Response.Status.NO_CONTENT).build();
+ }
+
+ // return http status '206 Partial Content', the partial response
+ // file and
+ // 'lastRowKey' header , if the pcaps response status is 'PARTIAL'
+
+ response.setHeader(HEADER_CONTENT_DISPOSITION_NAME,
+ HEADER_CONTENT_DISPOSITION_VALUE);
+
+ if (pcapResponse.getStatus() == PcapsResponse.Status.PARTIAL) {
+
+ response.setHeader(HEADER_PARTIAL_RESPONE_KEY,
+ pcapResponse.getLastRowKey());
+
+ return Response
+ .ok(pcapResponse.getPcaps(),
+ MediaType.APPLICATION_OCTET_STREAM).status(206)
+ .build();
+
+ }
+
+ } catch (IOException e) {
+ LOGGER.error(
+ "Exception occurred while fetching Pcaps for the keys :"
+ + keys.toString(), e);
+ throw e;
+ }
+
+ // return http status '200 OK' along with the complete pcaps response
+ // file,
+ // and headers
+ // return new ResponseEntity<byte[]>(pcapResponse.getPcaps(), headers,
+ // HttpStatus.OK);
+
+ return Response
+ .ok(pcapResponse.getPcaps(), MediaType.APPLICATION_OCTET_STREAM)
+ .status(200).build();
+
+ }
+
+
+ @GET
+ @Path("/pcapGetter/getPcapsByKeyRange")
+
+ public Response getPcapsByKeyRange(
+ @QueryParam("startKey") String startKey,
+ @QueryParam("endKey")String endKey,
+ @QueryParam("maxResponseSize") String maxResponseSize,
+ @DefaultValue("-1") @QueryParam("startTime")long startTime,
+ @DefaultValue("-1") @QueryParam("endTime") long endTime,
+ @Context HttpServletResponse servlet_response) throws IOException {
+
+ if (startKey == null || startKey.equals(""))
+ return Response.serverError().status(Response.Status.NO_CONTENT)
+ .entity("'start key' must not be null or empty").build();
+
+ if (startKey == null || startKey.equals(""))
+ return Response.serverError().status(Response.Status.NO_CONTENT)
+ .entity("'end key' must not be null or empty").build();
+
+
+ byte[] response = null;
+ try {
+ IPcapScanner pcapScanner = PcapScannerHBaseImpl.getInstance();
+ response = pcapScanner.getPcaps(startKey, endKey,
+ ConfigurationUtil.validateMaxResultSize(maxResponseSize), startTime,
+ endTime);
+ if (response == null || response.length == 0) {
+
+ return Response.status(Response.Status.NO_CONTENT).build();
+
+ }
+ servlet_response.setHeader(HEADER_CONTENT_DISPOSITION_NAME,
+ HEADER_CONTENT_DISPOSITION_VALUE);
+
+ } catch (IOException e) {
+ LOGGER.error(
+ "Exception occurred while fetching Pcaps for the key range : startKey="
+ + startKey + ", endKey=" + endKey, e);
+ throw e;
+ }
+ // return http status '200 OK' along with the complete pcaps response file,
+ // and headers
+
+ return Response
+ .ok(response, MediaType.APPLICATION_OCTET_STREAM)
+ .status(200).build();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * com.cisco.opensoc.hbase.client.IPcapReceiver#getPcapsByIdentifiers(java.lang
+ * .String, java.lang.String, java.lang.String, java.lang.String,
+ * java.lang.String, long, long, boolean,
+ * javax.servlet.http.HttpServletResponse)
+ */
+
+ @GET
+ @Path("/pcapGetter/getPcapsByIdentifiers")
+
+ public Response getPcapsByIdentifiers(
+ @QueryParam ("srcIp") String srcIp,
+ @QueryParam ("dstIp") String dstIp,
+ @QueryParam ("protocol") String protocol,
+ @QueryParam ("srcPort") String srcPort,
+ @QueryParam ("dstPort") String dstPort,
+ @DefaultValue("-1") @QueryParam ("startTime")long startTime,
+ @DefaultValue("-1") @QueryParam ("endTime")long endTime,
+ @DefaultValue("false") @QueryParam ("includeReverseTraffic") boolean includeReverseTraffic,
+ @Context HttpServletResponse servlet_response)
+
+ throws IOException {
+
+ if (srcIp == null || srcIp.equals(""))
+ return Response.serverError().status(Response.Status.NO_CONTENT)
+ .entity("'srcIp' must not be null or empty").build();
+
+ if (dstIp == null || dstIp.equals(""))
+ return Response.serverError().status(Response.Status.NO_CONTENT)
+ .entity("'dstIp' must not be null or empty").build();
+
+ if (protocol == null || protocol.equals(""))
+ return Response.serverError().status(Response.Status.NO_CONTENT)
+ .entity("'protocol' must not be null or empty").build();
+
+ if (srcPort == null || srcPort.equals(""))
+ return Response.serverError().status(Response.Status.NO_CONTENT)
+ .entity("'srcPort' must not be null or empty").build();
+
+ if (dstPort == null || dstPort.equals(""))
+ return Response.serverError().status(Response.Status.NO_CONTENT)
+ .entity("'dstPort' must not be null or empty").build();
+
+
+ PcapsResponse response = null;
+ try {
+ String sessionKey = PcapUtils.getSessionKey(srcIp, dstIp, protocol,
+ srcPort, dstPort);
+ LOGGER.info("sessionKey =" + sessionKey);
+ IPcapGetter pcapGetter = PcapGetterHBaseImpl.getInstance();
+ response = pcapGetter.getPcaps(Arrays.asList(sessionKey), null,
+ startTime, endTime, includeReverseTraffic, false,
+ ConfigurationUtil.getDefaultResultSize());
+ if (response == null || response.getResponseSize() == 0) {
+ return Response.status(Response.Status.NO_CONTENT).build();
+ }
+ servlet_response.setHeader(HEADER_CONTENT_DISPOSITION_NAME,
+ HEADER_CONTENT_DISPOSITION_VALUE);
+
+ } catch (IOException e) {
+ LOGGER.error("Exception occurred while fetching Pcaps by identifiers :",
+ e);
+ throw e;
+ }
+ // return http status '200 OK' along with the complete pcaps response file,
+ // and headers
+ return Response
+ .ok(response.getPcaps(), MediaType.APPLICATION_OCTET_STREAM)
+ .status(200).build();
+ }
+ /**
+ * This method parses the each value in the List using delimiter ',' and
+ * builds a new List;.
+ *
+ * @param keys
+ * list of keys to be parsed
+ * @return list of keys
+ */
+ @VisibleForTesting
+ List<String> parseKeys(List<String> keys) {
+ // Assert.notEmpty(keys);
+ List<String> parsedKeys = new ArrayList<String>();
+ for (String key : keys) {
+ parsedKeys.addAll(Arrays.asList(StringUtils.split(
+ StringUtils.trim(key), ",")));
+ }
+ return parsedKeys;
+ }
+}