You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/01/26 15:18:12 UTC

[34/89] [abbrv] [partial] incubator-metron git commit: Rename all OpenSOC files to Metron

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-Pcap_Service/README.txt
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Pcap_Service/README.txt b/metron-streaming/Metron-Pcap_Service/README.txt
new file mode 100644
index 0000000..8aba23e
--- /dev/null
+++ b/metron-streaming/Metron-Pcap_Service/README.txt
@@ -0,0 +1,16 @@
+'hbase' module of 'opensoc' project contains the code to communicate with HBase. This module has several APIs ( refer IPcapGetter.java, IPcapScanner.java files ) 
+to fetch pcaps from HBase. Following APIs have been created under this module implementation.
+
+APIs ( in IPcapGetter.java) to get pcaps using keys :
+ 1. public PcapsResponse getPcaps(List<String> keys, String lastRowKey, long startTime, long endTime, boolean includeReverseTraffic, boolean includeDuplicateLastRow, long maxResultSize) throws IOException;
+ 2. public PcapsResponse getPcaps(String key, long startTime, long endTime, boolean includeReverseTraffic) throws IOException;
+ 3. public PcapsResponse getPcaps(List<String> keys) throws IOException;
+ 4. public PcapsResponse getPcaps(String key) throws IOException;
+
+APIs ( in IPcapScanner.java) to get pcaps using key range :
+ 1. public byte[] getPcaps(String startKey, String endKey, long maxResponseSize, long startTime, long endTime) throws IOException;
+ 2. public byte[] getPcaps(String startKey, String endKey) throws IOException;
+ 
+ 
+Refer the wiki documentation for further details : https://hwcsco.atlassian.net/wiki/pages/viewpage.action?pageId=5242892
+ 	
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-Pcap_Service/pom.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Pcap_Service/pom.xml b/metron-streaming/Metron-Pcap_Service/pom.xml
new file mode 100644
index 0000000..ecbce82
--- /dev/null
+++ b/metron-streaming/Metron-Pcap_Service/pom.xml
@@ -0,0 +1,267 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+	<parent>
+		<groupId>com.opensoc</groupId>
+		<artifactId>OpenSOC-Streaming</artifactId>
+		<version>0.6BETA</version>
+	</parent>
+	<artifactId>OpenSOC-Pcap_Service</artifactId>
+	<description>OpenSOC Pcap_Service</description>
+	<properties>
+		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+		<flume.version>1.4.0.2.0.6.0-76</flume.version>
+		<hadoop.version>2.2.0.2.0.6.0-76</hadoop.version>
+		<maven.compiler.target>1.7</maven.compiler.target>
+		<maven.compiler.source>1.7</maven.compiler.source>
+		<storm.version>0.9.2-incubating</storm.version>
+		<kafka.version>0.8.0</kafka.version>
+		<slf4j.version>1.7.5</slf4j.version>
+		<zookeeper.version>3.4.5.2.0.6.0-76</zookeeper.version>
+		<logger.version>1.2.15</logger.version>
+
+		<storm-kafka.version>0.9.2-incubating</storm-kafka.version>
+		<storm-hdfs.version>0.0.7-SNAPSHOT</storm-hdfs.version>
+		<storm-hbase.version>0.0.5-SNAPSHOT</storm-hbase.version>
+
+		<spring.integration.version>3.0.0.RELEASE</spring.integration.version>
+		<spring.version>3.2.6.RELEASE</spring.version>
+		<commons-fileupload.version>1.2.2</commons-fileupload.version>
+		<commons-io.version>2.4</commons-io.version>
+		<commons-configuration.version>1.10</commons-configuration.version>
+		<commons-lang.version>2.6</commons-lang.version>
+		<commons-collections.version>3.2.1</commons-collections.version>
+		<commons-beanutils.version>1.8.3</commons-beanutils.version>
+		<commons-jexl.version>2.1.1</commons-jexl.version>
+
+
+		<junit.version>4.11</junit.version>
+		<hamcrest.version>1.3</hamcrest.version>
+		<mockito.version>1.9.5</mockito.version>
+		<elastic-search.version>1.3.0</elastic-search.version>
+	</properties>
+	<dependencies>
+		<dependency>
+			<groupId>org.jboss.resteasy</groupId>
+			<artifactId>jaxrs-api</artifactId>
+			<version>3.0.4.Final</version>
+		</dependency>
+		<dependency>
+			<groupId>com.opensoc</groupId>
+			<artifactId>OpenSOC-Common</artifactId>
+			<version>${project.parent.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>commons-beanutils</groupId>
+			<artifactId>commons-beanutils</artifactId>
+			<version>${commons-beanutils.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.commons</groupId>
+			<artifactId>commons-jexl</artifactId>
+			<version>${commons-jexl.version}</version>
+		</dependency>
+
+		<dependency>
+			<artifactId>commons-configuration</artifactId>
+			<groupId>commons-configuration</groupId>
+			<version>${commons-configuration.version}</version>
+			<exclusions>
+				<exclusion>
+					<groupId>org.slf4j</groupId>
+					<artifactId>slf4j-api</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+		<dependency>
+			<groupId>junit</groupId>
+			<artifactId>junit</artifactId>
+			<version>${junit.version}</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.powermock</groupId>
+			<artifactId>powermock-api-mockito</artifactId>
+			<version>1.5</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.powermock</groupId>
+			<artifactId>powermock-core</artifactId>
+			<version>1.5</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.powermock</groupId>
+			<artifactId>powermock-module-junit4</artifactId>
+			<version>1.5</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>joda-time</groupId>
+			<artifactId>joda-time</artifactId>
+			<version>2.3</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.hbase</groupId>
+			<artifactId>hbase-client</artifactId>
+			<version>${global_hbase_version}</version>
+			<scope>provided</scope>
+			<exclusions>
+				<exclusion>
+					<groupId>org.slf4j</groupId>
+					<artifactId>slf4j-log4j12</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.hbase</groupId>
+			<artifactId>hbase-testing-util</artifactId>
+			<version>${global_hbase_version}</version>
+			<scope>provided</scope>
+			<exclusions>
+				<exclusion>
+					<groupId>org.slf4j</groupId>
+					<artifactId>slf4j-log4j12</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-common</artifactId>
+			<version>${global_hadoop_version}</version>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-hdfs</artifactId>
+			<version>${global_hadoop_version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.springframework.integration</groupId>
+			<artifactId>spring-integration-http</artifactId>
+			<version>${spring.integration.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.springframework</groupId>
+			<artifactId>spring-webmvc</artifactId>
+			<version>${spring.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>log4j</groupId>
+			<artifactId>log4j</artifactId>
+			<version>${logger.version}</version>
+			<exclusions>
+				<exclusion>
+					<groupId>com.sun.jmx</groupId>
+					<artifactId>jmxri</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jdmk</groupId>
+					<artifactId>jmxtools</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>javax.jms</groupId>
+					<artifactId>jms</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+
+
+
+
+
+		<dependency>
+			<groupId>org.jboss.resteasy</groupId>
+			<artifactId>resteasy-jaxrs</artifactId>
+			<version>3.0.1.Final</version>
+			<exclusions>
+				<exclusion>
+					<groupId>org.slf4j</groupId>
+					<artifactId>slf4j-simple</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+		<dependency>
+			<groupId>org.jboss.resteasy</groupId>
+			<artifactId>resteasy-jaxb-provider</artifactId>
+			<version>3.0.1.Final</version>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.jboss.resteasy</groupId>
+			<artifactId>async-http-servlet-3.0</artifactId>
+			<version>3.0.1.Final</version>
+			<scope>compile</scope>
+		</dependency>
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+		<dependency>
+			<groupId>org.eclipse.jetty</groupId>
+			<artifactId>jetty-server</artifactId>
+			<version>9.3.0.M0</version>
+		</dependency>
+		<dependency>
+			<groupId>org.eclipse.jetty</groupId>
+			<artifactId>jetty-servlet</artifactId>
+			<version>9.3.0.M0</version>
+		</dependency>
+		<dependency>
+			<groupId>org.slf4j</groupId>
+			<artifactId>slf4j-simple</artifactId>
+			<version>${global_slf4j_version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.slf4j</groupId>
+			<artifactId>slf4j-api</artifactId>
+			<version>${global_slf4j_version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.slf4j</groupId>
+			<artifactId>slf4j-log4j12</artifactId>
+			<version>${global_slf4j_version}</version>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<artifactId>maven-assembly-plugin</artifactId>
+				<configuration>
+					<archive>
+						<manifest>
+							<mainClass>com.opensoc.pcapservice.rest.PcapService</mainClass>
+						</manifest>
+					</archive>
+					<descriptorRefs>
+						<descriptorRef>jar-with-dependencies</descriptorRef>
+					</descriptorRefs>
+				</configuration>
+				<executions>
+					<execution>
+						<id>make-assembly</id> <!-- this is used for inheritance merges -->
+						<phase>package</phase> <!-- bind to the packaging phase -->
+						<goals>
+							<goal>single</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-Pcap_Service/pom.xml.versionsBackup
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Pcap_Service/pom.xml.versionsBackup b/metron-streaming/Metron-Pcap_Service/pom.xml.versionsBackup
new file mode 100644
index 0000000..a400fe2
--- /dev/null
+++ b/metron-streaming/Metron-Pcap_Service/pom.xml.versionsBackup
@@ -0,0 +1,268 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+	<parent>
+		<groupId>com.opensoc</groupId>
+		<artifactId>OpenSOC-Streaming</artifactId>
+		<version>0.4BETA</version>
+	</parent>
+	<artifactId>OpenSOC-Pcap_Service</artifactId>
+	<description>OpenSOC Pcap_Service</description>
+	<properties>
+		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+		<flume.version>1.4.0.2.0.6.0-76</flume.version>
+		<hadoop.version>2.2.0.2.0.6.0-76</hadoop.version>
+		<maven.compiler.source>${jdk.version}</maven.compiler.source>
+		<maven.compiler.target>${jdk.version}</maven.compiler.target>
+
+		<storm.version>0.9.2-incubating</storm.version>
+		<kafka.version>0.8.0</kafka.version>
+		<slf4j.version>1.7.5</slf4j.version>
+		<zookeeper.version>3.4.5.2.0.6.0-76</zookeeper.version>
+		<logger.version>1.2.15</logger.version>
+
+		<storm-kafka.version>0.9.2-incubating</storm-kafka.version>
+		<storm-hdfs.version>0.0.7-SNAPSHOT</storm-hdfs.version>
+		<storm-hbase.version>0.0.5-SNAPSHOT</storm-hbase.version>
+
+		<spring.integration.version>3.0.0.RELEASE</spring.integration.version>
+		<spring.version>3.2.6.RELEASE</spring.version>
+		<commons-fileupload.version>1.2.2</commons-fileupload.version>
+		<commons-io.version>2.4</commons-io.version>
+		<commons-configuration.version>1.10</commons-configuration.version>
+		<commons-lang.version>2.6</commons-lang.version>
+		<commons-collections.version>3.2.1</commons-collections.version>
+		<commons-beanutils.version>1.8.3</commons-beanutils.version>
+		<commons-jexl.version>2.1.1</commons-jexl.version>
+
+
+		<junit.version>4.11</junit.version>
+		<hamcrest.version>1.3</hamcrest.version>
+		<mockito.version>1.9.5</mockito.version>
+		<elastic-search.version>1.3.0</elastic-search.version>
+	</properties>
+	<dependencies>
+		<dependency>
+			<groupId>org.jboss.resteasy</groupId>
+			<artifactId>jaxrs-api</artifactId>
+			<version>3.0.4.Final</version>
+		</dependency>
+		<dependency>
+			<groupId>com.opensoc</groupId>
+			<artifactId>OpenSOC-Common</artifactId>
+			<version>${parent.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>commons-beanutils</groupId>
+			<artifactId>commons-beanutils</artifactId>
+			<version>${commons-beanutils.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.commons</groupId>
+			<artifactId>commons-jexl</artifactId>
+			<version>${commons-jexl.version}</version>
+		</dependency>
+
+		<dependency>
+			<artifactId>commons-configuration</artifactId>
+			<groupId>commons-configuration</groupId>
+			<version>${commons-configuration.version}</version>
+			<exclusions>
+				<exclusion>
+					<groupId>org.slf4j</groupId>
+					<artifactId>slf4j-api</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+		<dependency>
+			<groupId>junit</groupId>
+			<artifactId>junit</artifactId>
+			<version>${junit.version}</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.powermock</groupId>
+			<artifactId>powermock-api-mockito</artifactId>
+			<version>1.5</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.powermock</groupId>
+			<artifactId>powermock-core</artifactId>
+			<version>1.5</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.powermock</groupId>
+			<artifactId>powermock-module-junit4</artifactId>
+			<version>1.5</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>joda-time</groupId>
+			<artifactId>joda-time</artifactId>
+			<version>2.3</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.hbase</groupId>
+			<artifactId>hbase-client</artifactId>
+			<version>${global_hbase_version}</version>
+			<scope>provided</scope>
+			<exclusions>
+				<exclusion>
+					<groupId>org.slf4j</groupId>
+					<artifactId>slf4j-log4j12</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.hbase</groupId>
+			<artifactId>hbase-testing-util</artifactId>
+			<version>${global_hbase_version}</version>
+			<scope>provided</scope>
+			<exclusions>
+				<exclusion>
+					<groupId>org.slf4j</groupId>
+					<artifactId>slf4j-log4j12</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-common</artifactId>
+			<version>${global_hadoop_version}</version>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-hdfs</artifactId>
+			<version>${global_hadoop_version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.springframework.integration</groupId>
+			<artifactId>spring-integration-http</artifactId>
+			<version>${spring.integration.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.springframework</groupId>
+			<artifactId>spring-webmvc</artifactId>
+			<version>${spring.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>log4j</groupId>
+			<artifactId>log4j</artifactId>
+			<version>${logger.version}</version>
+			<exclusions>
+				<exclusion>
+					<groupId>com.sun.jmx</groupId>
+					<artifactId>jmxri</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jdmk</groupId>
+					<artifactId>jmxtools</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>javax.jms</groupId>
+					<artifactId>jms</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+
+
+
+
+
+		<dependency>
+			<groupId>org.jboss.resteasy</groupId>
+			<artifactId>resteasy-jaxrs</artifactId>
+			<version>3.0.1.Final</version>
+			<exclusions>
+				<exclusion>
+					<groupId>org.slf4j</groupId>
+					<artifactId>slf4j-simple</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+		<dependency>
+			<groupId>org.jboss.resteasy</groupId>
+			<artifactId>resteasy-jaxb-provider</artifactId>
+			<version>3.0.1.Final</version>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.jboss.resteasy</groupId>
+			<artifactId>async-http-servlet-3.0</artifactId>
+			<version>3.0.1.Final</version>
+			<scope>compile</scope>
+		</dependency>
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+		<dependency>
+			<groupId>org.eclipse.jetty</groupId>
+			<artifactId>jetty-server</artifactId>
+			<version>9.3.0.M0</version>
+		</dependency>
+		<dependency>
+			<groupId>org.eclipse.jetty</groupId>
+			<artifactId>jetty-servlet</artifactId>
+			<version>9.3.0.M0</version>
+		</dependency>
+		<dependency>
+			<groupId>org.slf4j</groupId>
+			<artifactId>slf4j-simple</artifactId>
+			<version>${global_slf4j_version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.slf4j</groupId>
+			<artifactId>slf4j-api</artifactId>
+			<version>${global_slf4j_version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.slf4j</groupId>
+			<artifactId>slf4j-log4j12</artifactId>
+			<version>${global_slf4j_version}</version>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<artifactId>maven-assembly-plugin</artifactId>
+				<configuration>
+					<archive>
+						<manifest>
+							<mainClass>com.opensoc.pcapservice.rest.PcapService</mainClass>
+						</manifest>
+					</archive>
+					<descriptorRefs>
+						<descriptorRef>jar-with-dependencies</descriptorRef>
+					</descriptorRefs>
+				</configuration>
+				<executions>
+					<execution>
+						<id>make-assembly</id> <!-- this is used for inheritance merges -->
+						<phase>package</phase> <!-- bind to the packaging phase -->
+						<goals>
+							<goal>single</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-Pcap_Service/src/main/java/com/apache/metron/pcapservice/CellTimestampComparator.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Pcap_Service/src/main/java/com/apache/metron/pcapservice/CellTimestampComparator.java b/metron-streaming/Metron-Pcap_Service/src/main/java/com/apache/metron/pcapservice/CellTimestampComparator.java
new file mode 100644
index 0000000..e45d849
--- /dev/null
+++ b/metron-streaming/Metron-Pcap_Service/src/main/java/com/apache/metron/pcapservice/CellTimestampComparator.java
@@ -0,0 +1,23 @@
+package com.opensoc.pcapservice;
+
+import java.util.Comparator;
+
+import org.apache.hadoop.hbase.Cell;
+
+/**
+ * Comparator created for sorting pcaps cells based on the timestamp (asc).
+ * 
+ * @author Sayi
+ */
+public class CellTimestampComparator implements Comparator<Cell> {
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see java.util.Comparator#compare(java.lang.Object, java.lang.Object)
+   */
+  
+  public int compare(Cell o1, Cell o2) {
+    return Long.valueOf(o1.getTimestamp()).compareTo(o2.getTimestamp());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-Pcap_Service/src/main/java/com/apache/metron/pcapservice/ConfigurationUtil.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Pcap_Service/src/main/java/com/apache/metron/pcapservice/ConfigurationUtil.java b/metron-streaming/Metron-Pcap_Service/src/main/java/com/apache/metron/pcapservice/ConfigurationUtil.java
new file mode 100644
index 0000000..be1a1bf
--- /dev/null
+++ b/metron-streaming/Metron-Pcap_Service/src/main/java/com/apache/metron/pcapservice/ConfigurationUtil.java
@@ -0,0 +1,269 @@
+package com.opensoc.pcapservice;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.springframework.util.Assert;
+
+import com.opensoc.configuration.ConfigurationManager;
+
+
+
+/**
+ * utility class for this module which loads commons configuration to fetch
+ * properties from underlying resources to communicate with hbase.
+ * 
+ * @author Sayi
+ */
+public class ConfigurationUtil {
+
+	/** Configuration definition file name for fetching pcaps from hbase */
+	private static final String configDefFileName = "config-definition-hbase.xml";
+	
+	/** property configuration. */
+	private static Configuration propConfiguration = null;
+
+
+	/**
+	 * The Enum SizeUnit.
+	 */
+	public enum SizeUnit {
+
+		/** The kb. */
+		KB,
+		/** The mb. */
+		MB
+	};
+
+	/** The Constant DEFAULT_HCONNECTION_RETRY_LIMIT. */
+	private static final int DEFAULT_HCONNECTION_RETRY_LIMIT = 0;
+
+	/**
+	 * Loads configuration resources 
+	 * @return Configuration
+	 */
+	public static Configuration getConfiguration() {
+		if(propConfiguration == null){
+			propConfiguration =  ConfigurationManager.getConfiguration(configDefFileName);
+		}
+		return propConfiguration;
+	}
+
+	/**
+	 * Returns the configured default result size in bytes, if the user input is
+	 * null; otherwise, returns the user input after validating with the
+	 * configured max value. Throws IllegalArgumentException if : 1. input is
+	 * less than or equals to 0 OR 2. input is greater than configured
+	 * {hbase.scan.max.result.size} value
+	 * 
+	 * @param input
+	 *            the input
+	 * @return long
+	 */
+	public static long validateMaxResultSize(String input) {
+		if (input == null) {
+			return getDefaultResultSize();
+		}
+		// validate the user input
+		long value = convertToBytes(Long.parseLong(input), getResultSizeUnit());
+		Assert.isTrue(
+				isAllowableResultSize(value),
+				"'maxResponseSize' param value must be positive and less than {hbase.scan.max.result.size} value");
+		return convertToBytes(value, getResultSizeUnit());
+	}
+
+	/**
+	 * Checks if is allowable result size.
+	 * 
+	 * @param input
+	 *            the input
+	 * @return true, if is allowable result size
+	 */
+	public static boolean isAllowableResultSize(long input) {
+		if (input <= 0 || input > getMaxResultSize()) {
+			return false;
+		}
+		return true;
+	}
+
+	/**
+	 * Returns the configured default result size in bytes.
+	 * 
+	 * @return long
+	 */
+	public static long getDefaultResultSize() {
+		float value = ConfigurationUtil.getConfiguration().getFloat(
+				"hbase.scan.default.result.size");
+		return convertToBytes(value, getResultSizeUnit());
+	}
+
+	/**
+	 * Returns the configured max result size in bytes.
+	 * 
+	 * @return long
+	 */
+	public static long getMaxResultSize() {
+		float value = ConfigurationUtil.getConfiguration().getFloat(
+				"hbase.scan.max.result.size");
+		return convertToBytes(value, getResultSizeUnit());
+	}
+
+	/**
+	 * Returns the configured max row size in bytes.
+	 * 
+	 * @return long
+	 */
+	public static long getMaxRowSize() {
+		float maxRowSize = ConfigurationUtil.getConfiguration().getFloat(
+				"hbase.table.max.row.size");
+		return convertToBytes(maxRowSize, getRowSizeUnit());
+	}
+
+	/**
+	 * Gets the result size unit.
+	 * 
+	 * @return the result size unit
+	 */
+	public static SizeUnit getResultSizeUnit() {
+		return SizeUnit.valueOf(ConfigurationUtil.getConfiguration()
+				.getString("hbase.scan.result.size.unit"));
+	}
+
+	/**
+	 * Gets the row size unit.
+	 * 
+	 * @return the row size unit
+	 */
+	public static SizeUnit getRowSizeUnit() {
+		return SizeUnit.valueOf(ConfigurationUtil.getConfiguration()
+				.getString("hbase.table.row.size.unit"));
+	}
+
+	/**
+	 * Gets the connection retry limit.
+	 * 
+	 * @return the connection retry limit
+	 */
+	public static int getConnectionRetryLimit() {
+		return ConfigurationUtil.getConfiguration().getInt(
+				"hbase.hconnection.retries.number",
+				DEFAULT_HCONNECTION_RETRY_LIMIT);
+	}
+
+	/**
+	 * Checks if is default include reverse traffic.
+	 * 
+	 * @return true, if is default include reverse traffic
+	 */
+	public static boolean isDefaultIncludeReverseTraffic() {
+		return ConfigurationUtil.getConfiguration().getBoolean(
+				"pcaps.include.reverse.traffic");
+	}
+
+	/**
+	 * Gets the table name.
+	 * 
+	 * @return the table name
+	 */
+	public static byte[] getTableName() {
+		return Bytes.toBytes(ConfigurationUtil.getConfiguration().getString(
+				"hbase.table.name"));
+	}
+
+	/**
+	 * Gets the column family.
+	 * 
+	 * @return the column family
+	 */
+	public static byte[] getColumnFamily() {
+		return Bytes.toBytes(ConfigurationUtil.getConfiguration().getString(
+				"hbase.table.column.family"));
+	}
+
+	/**
+	 * Gets the column qualifier.
+	 * 
+	 * @return the column qualifier
+	 */
+	public static byte[] getColumnQualifier() {
+		return Bytes.toBytes(ConfigurationUtil.getConfiguration().getString(
+				"hbase.table.column.qualifier"));
+	}
+
+	/**
+	 * Gets the max versions.
+	 * 
+	 * @return the max versions
+	 */
+	public static int getMaxVersions() {
+		return ConfigurationUtil.getConfiguration().getInt(
+				"hbase.table.column.maxVersions");
+	}
+
+	/**
+	 * Gets the configured tokens in rowkey.
+	 * 
+	 * @return the configured tokens in rowkey
+	 */
+	public static int getConfiguredTokensInRowkey() {
+		return ConfigurationUtil.getConfiguration().getInt(
+				"hbase.table.row.key.tokens");
+	}
+
+	/**
+	 * Gets the minimum tokens in inputkey.
+	 * 
+	 * @return the minimum tokens in inputkey
+	 */
+	public static int getMinimumTokensInInputkey() {
+		return ConfigurationUtil.getConfiguration().getInt(
+				"rest.api.input.key.min.tokens");
+	}
+
+	/**
+	 * Gets the appending token digits.
+	 * 
+	 * @return the appending token digits
+	 */
+	public static int getAppendingTokenDigits() {
+		return ConfigurationUtil.getConfiguration().getInt(
+				"hbase.table.row.key.token.appending.digits");
+	}
+
+	/**
+	 * Convert to bytes.
+	 * 
+	 * @param value
+	 *            the value
+	 * @param unit
+	 *            the unit
+	 * @return the long
+	 */
+	public static long convertToBytes(float value, SizeUnit unit) {
+		if (SizeUnit.KB == unit) {
+			return (long) (value * 1024);
+		}
+		if (SizeUnit.MB == unit) {
+			return (long) (value * 1024 * 1024);
+		}
+		return (long) value;
+	}
+
+	/**
+	 * The main method.
+	 * 
+	 * @param args
+	 *            the arguments
+	 */
+	public static void main(String[] args) {
+		long r1 = getMaxRowSize();
+		System.out.println("getMaxRowSizeInBytes = " + r1);
+		long r2 = getMaxResultSize();
+		System.out.println("getMaxAllowableResultSizeInBytes = " + r2);
+
+		SizeUnit u1 = getRowSizeUnit();
+		System.out.println("getMaxRowSizeUnit = " + u1.toString());
+		SizeUnit u2 = getResultSizeUnit();
+		System.out.println("getMaxAllowableResultsSizeUnit = " + u2.toString());
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-Pcap_Service/src/main/java/com/apache/metron/pcapservice/HBaseConfigConstants.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Pcap_Service/src/main/java/com/apache/metron/pcapservice/HBaseConfigConstants.java b/metron-streaming/Metron-Pcap_Service/src/main/java/com/apache/metron/pcapservice/HBaseConfigConstants.java
new file mode 100644
index 0000000..a7e7e3b
--- /dev/null
+++ b/metron-streaming/Metron-Pcap_Service/src/main/java/com/apache/metron/pcapservice/HBaseConfigConstants.java
@@ -0,0 +1,40 @@
+package com.opensoc.pcapservice;
+
+/**
+ * HBase configuration properties.
+ * 
+ * @author Sayi
+ */
+public class HBaseConfigConstants {
+
+  /** The Constant HBASE_ZOOKEEPER_QUORUM. */
+  public static final String HBASE_ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum";
+
+  /** The Constant HBASE_ZOOKEEPER_CLIENT_PORT. */
+  public static final String HBASE_ZOOKEEPER_CLIENT_PORT = "hbase.zookeeper.clientPort";
+
+  /** The Constant HBASE_ZOOKEEPER_SESSION_TIMEOUT. */
+  public static final String HBASE_ZOOKEEPER_SESSION_TIMEOUT = "zookeeper.session.timeout";
+
+  /** The Constant HBASE_ZOOKEEPER_RECOVERY_RETRY. */
+  public static final String HBASE_ZOOKEEPER_RECOVERY_RETRY = "zookeeper.recovery.retry";
+
+  /** The Constant HBASE_CLIENT_RETRIES_NUMBER. */
+  public static final String HBASE_CLIENT_RETRIES_NUMBER = "hbase.client.retries.number";
+
+  /** The delimeter. */
+  String delimeter = "-";
+
+  /** The regex. */
+  String regex = "\\-";
+
+  /** The Constant PCAP_KEY_DELIMETER. */
+  public static final String PCAP_KEY_DELIMETER = "-";
+
+  /** The Constant START_KEY. */
+  public static final String START_KEY = "startKey";
+
+  /** The Constant END_KEY. */
+  public static final String END_KEY = "endKey";
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-Pcap_Service/src/main/java/com/apache/metron/pcapservice/HBaseConfigurationUtil.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Pcap_Service/src/main/java/com/apache/metron/pcapservice/HBaseConfigurationUtil.java b/metron-streaming/Metron-Pcap_Service/src/main/java/com/apache/metron/pcapservice/HBaseConfigurationUtil.java
new file mode 100644
index 0000000..8a5c022
--- /dev/null
+++ b/metron-streaming/Metron-Pcap_Service/src/main/java/com/apache/metron/pcapservice/HBaseConfigurationUtil.java
@@ -0,0 +1,165 @@
+/**
+ * 
+ */
+package com.opensoc.pcapservice;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.log4j.Logger;
+import org.mortbay.log.Log;
+
+/**
+ * Utility class which creates HConnection instance when the first request is
+ * received and registers a shut down hook which closes the connection when the
+ * JVM exits. Creates new connection to the cluster only if the existing
+ * connection is closed for unknown reasons. Also creates Configuration with
+ * HBase resources using configuration properties.
+ * 
+ * @author Sayi
+ * 
+ */
+public class HBaseConfigurationUtil {
+
+  /** The Constant LOGGER. */
+  private static final Logger LOGGER = Logger
+      .getLogger(HBaseConfigurationUtil.class);
+
+  /** Configuration which holds all HBase properties. */
+  private static Configuration config;
+
+  /**
+   * A cluster connection which knows how to find master node and locate regions
+   * on the cluster.
+   */
+  private static HConnection clusterConnection = null;
+
+  /**
+   * Creates HConnection instance when the first request is received and returns
+   * the same instance for all subsequent requests if the connection is still
+   * open.
+   * 
+   * @return HConnection instance
+   * @throws IOException
+   *           Signals that an I/O exception has occurred.
+   */
+  public static HConnection getConnection() throws IOException {
+    if (!connectionAvailable()) {
+      synchronized (HBaseConfigurationUtil.class) {
+        createClusterConncetion();
+      }
+    }
+    return clusterConnection;
+  }
+
+  /**
+   * Creates the cluster conncetion.
+   * 
+   * @throws IOException
+   *           Signals that an I/O exception has occurred.
+   */
+  private static void createClusterConncetion() throws IOException {
+    try {
+      if (connectionAvailable()) {
+        return;
+      }
+      clusterConnection = HConnectionManager.createConnection(read());
+      addShutdownHook();
+      System.out.println("Created HConnection and added shutDownHook");
+    } catch (IOException e) {
+      LOGGER
+          .error(
+              "Exception occurred while creating HConnection using HConnectionManager",
+              e);
+      throw e;
+    }
+  }
+
+  /**
+   * Connection available.
+   * 
+   * @return true, if successful
+   */
+  private static boolean connectionAvailable() {
+    if (clusterConnection == null) {
+      System.out.println("clusterConnection=" + clusterConnection);
+      return false;
+    }
+    System.out.println("clusterConnection.isClosed()="
+        + clusterConnection.isClosed());
+    return clusterConnection != null && !clusterConnection.isClosed();
+  }
+
+  /**
+   * Adds the shutdown hook.
+   */
+  private static void addShutdownHook() {
+    Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+      public void run() {
+        System.out
+            .println("Executing ShutdownHook HBaseConfigurationUtil : Closing HConnection");
+        try {
+          clusterConnection.close();
+        } catch (IOException e) {
+          Log.debug("Caught ignorable exception ", e);
+        }
+      }
+    }, "HBaseConfigurationUtilShutDown"));
+  }
+
+  /**
+   * Closes the underlying connection to cluster; ignores if any exception is
+   * thrown.
+   */
+  public static void closeConnection() {
+    if (clusterConnection != null) {
+      try {
+        clusterConnection.close();
+      } catch (IOException e) {
+        Log.debug("Caught ignorable exception ", e);
+      }
+    }
+  }
+
+  /**
+   * This method creates Configuration with HBase resources using configuration
+   * properties. The same Configuration object will be used to communicate with
+   * all HBase tables;
+   * 
+   * @return Configuration object
+   */
+  public static Configuration read() {
+    if (config == null) {
+      synchronized (HBaseConfigurationUtil.class) {
+        if (config == null) {
+          config = HBaseConfiguration.create();
+
+          config.set(
+              HBaseConfigConstants.HBASE_ZOOKEEPER_QUORUM,
+              ConfigurationUtil.getConfiguration().getString(
+                  "hbase.zookeeper.quorum"));
+          config.set(
+              HBaseConfigConstants.HBASE_ZOOKEEPER_CLIENT_PORT,
+              ConfigurationUtil.getConfiguration().getString(
+                  "hbase.zookeeper.clientPort"));
+          config.set(
+              HBaseConfigConstants.HBASE_CLIENT_RETRIES_NUMBER,
+              ConfigurationUtil.getConfiguration().getString(
+                  "hbase.client.retries.number"));
+          config.set(
+              HBaseConfigConstants.HBASE_ZOOKEEPER_SESSION_TIMEOUT,
+              ConfigurationUtil.getConfiguration().getString(
+                  "zookeeper.session.timeout"));
+          config.set(
+              HBaseConfigConstants.HBASE_ZOOKEEPER_RECOVERY_RETRY,
+              ConfigurationUtil.getConfiguration().getString(
+                  "zookeeper.recovery.retry"));
+        }
+      }
+    }
+    return config;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-Pcap_Service/src/main/java/com/apache/metron/pcapservice/IPcapGetter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Pcap_Service/src/main/java/com/apache/metron/pcapservice/IPcapGetter.java b/metron-streaming/Metron-Pcap_Service/src/main/java/com/apache/metron/pcapservice/IPcapGetter.java
new file mode 100644
index 0000000..dbff59c
--- /dev/null
+++ b/metron-streaming/Metron-Pcap_Service/src/main/java/com/apache/metron/pcapservice/IPcapGetter.java
@@ -0,0 +1,88 @@
+/**
+ * 
+ */
+package com.opensoc.pcapservice;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * interface to all 'keys' based pcaps fetching methods.
+ * 
+ * @author Sayi
+ */
+public interface IPcapGetter {
+
+  /**
+   * Gets the pcaps for the input list of keys and lastRowKey.
+   * 
+   * @param keys
+   *          the list of keys for which pcaps are to be retrieved
+   * @param lastRowKey
+   *          last row key from the previous partial response
+   * @param startTime
+   *          the start time in system milliseconds to be used to filter the
+   *          pcaps. The value is set to '0' if the caller sends negative value
+   * @param endTime
+   *          the end time in system milliseconds to be used to filter the
+   *          pcaps. The value is set to Long.MAX_VALUE if the caller sends
+   *          negative value. 'endTime' must be greater than the 'startTime'.
+   * @param includeReverseTraffic
+   *          indicates whether or not to include pcaps from the reverse traffic
+   * @param includeDuplicateLastRow
+   *          indicates whether or not to include the last row from the previous
+   *          partial response
+   * @param maxResultSize
+   *          the max result size
+   * @return PcapsResponse with all matching pcaps merged together
+   * @throws IOException
+   *           Signals that an I/O exception has occurred.
+   */
+  public PcapsResponse getPcaps(List<String> keys, String lastRowKey,
+      long startTime, long endTime, boolean includeReverseTraffic,
+      boolean includeDuplicateLastRow, long maxResultSize) throws IOException;
+
+  /**
+   * Gets the pcaps for the input key.
+   * 
+   * @param key
+   *          the key for which pcaps is to be retrieved.
+   * @param startTime
+   *          the start time in system milliseconds to be used to filter the
+   *          pcaps. The value is set to '0' if the caller sends negative value
+   * @param endTime
+   *          the end time in system milliseconds to be used to filter the
+   *          pcaps.The value is set to Long.MAX_VALUE if the caller sends
+   *          negative value. 'endTime' must be greater than the 'startTime'.
+   * @param includeReverseTraffic
+   *          indicates whether or not to include pcaps from the reverse traffic
+   * @return PcapsResponse with all matching pcaps merged together
+   * @throws IOException
+   *           Signals that an I/O exception has occurred.
+   */
+  public PcapsResponse getPcaps(String key, long startTime, long endTime,
+      boolean includeReverseTraffic) throws IOException;
+
+  /**
+   * Gets the pcaps for the input list of keys.
+   * 
+   * @param keys
+   *          the list of keys for which pcaps are to be retrieved.
+   * @return PcapsResponse with all matching pcaps merged together
+   * @throws IOException
+   *           Signals that an I/O exception has occurred.
+   */
+  public PcapsResponse getPcaps(List<String> keys) throws IOException;
+
+  /**
+   * Gets the pcaps for the input key.
+   * 
+   * @param key
+   *          the key for which pcaps is to be retrieved.
+   * @return PcapsResponse with all matching pcaps merged together
+   * @throws IOException
+   *           Signals that an I/O exception has occurred.
+   */
+  public PcapsResponse getPcaps(String key) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-Pcap_Service/src/main/java/com/apache/metron/pcapservice/IPcapScanner.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Pcap_Service/src/main/java/com/apache/metron/pcapservice/IPcapScanner.java b/metron-streaming/Metron-Pcap_Service/src/main/java/com/apache/metron/pcapservice/IPcapScanner.java
new file mode 100644
index 0000000..64408e9
--- /dev/null
+++ b/metron-streaming/Metron-Pcap_Service/src/main/java/com/apache/metron/pcapservice/IPcapScanner.java
@@ -0,0 +1,49 @@
+package com.opensoc.pcapservice;
+
+import java.io.IOException;
+
+/**
+ * The Interface for all pcaps fetching methods based on key range.
+ */
+public interface IPcapScanner {
+
+  /**
+   * Gets the pcaps for between startKey (inclusive) and endKey (exclusive).
+   * 
+   * @param startKey
+   *          the start key of a key range for which pcaps is to be retrieved.
+   * @param endKey
+   *          the end key of a key range for which pcaps is to be retrieved.
+   * @param maxResponseSize
+   *          indicates the maximum response size in MegaBytes(MB). User needs
+   *          to pass positive value and must be less than 60 (MB)
+   * @param startTime
+   *          the start time in system milliseconds to be used to filter the
+   *          pcaps. The value is set to '0' if the caller sends negative value
+   * @param endTime
+   *          the end time in system milliseconds to be used to filter the
+   *          pcaps. The value is set Long.MAX_VALUE if the caller sends
+   *          negative value
+   * @return byte array with all matching pcaps merged together
+   * @throws IOException
+   *           Signals that an I/O exception has occurred.
+   */
+  public byte[] getPcaps(String startKey, String endKey, long maxResponseSize,
+      long startTime, long endTime) throws IOException;
+
+  /**
+   * Gets the pcaps for between startKey (inclusive) and endKey (exclusive).
+   * 
+   * @param startKey
+   *          the start key (inclusive) of a key range for which pcaps is to be
+   *          retrieved.
+   * @param endKey
+   *          the end key (exclusive) of a key range for which pcaps is to be
+   *          retrieved.
+   * @return byte array with all matching pcaps merged together
+   * @throws IOException
+   *           Signals that an I/O exception has occurred.
+   */
+  public byte[] getPcaps(String startKey, String endKey) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-Pcap_Service/src/main/java/com/apache/metron/pcapservice/PcapGetterHBaseImpl.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Pcap_Service/src/main/java/com/apache/metron/pcapservice/PcapGetterHBaseImpl.java b/metron-streaming/Metron-Pcap_Service/src/main/java/com/apache/metron/pcapservice/PcapGetterHBaseImpl.java
new file mode 100644
index 0000000..b06137d
--- /dev/null
+++ b/metron-streaming/Metron-Pcap_Service/src/main/java/com/apache/metron/pcapservice/PcapGetterHBaseImpl.java
@@ -0,0 +1,809 @@
+package com.opensoc.pcapservice;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.annotation.Resource;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Response;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.NoServerForRegionException;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.log4j.Logger;
+import org.springframework.util.Assert;
+import org.springframework.util.CollectionUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Singleton class which integrates with HBase table and returns pcaps sorted by
+ * timestamp(dsc) for the given list of keys. Creates HConnection if it is not
+ * already created and the same connection instance is being used for all
+ * requests
+ * 
+ * @author sheetal
+ * @version $Revision: 1.0 $
+ */
+
+@Path("/")
+public class PcapGetterHBaseImpl implements IPcapGetter {
+
+  /** The pcap getter h base. */
+  private static IPcapGetter pcapGetterHBase = null;
+
+  /** The Constant LOG. */
+  private static final Logger LOGGER = Logger
+      .getLogger(PcapGetterHBaseImpl.class);
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see com.cisco.opensoc.hbase.client.IPcapGetter#getPcaps(java.util.List,
+   * java.lang.String, long, long, boolean, boolean, long)
+   */
+ 
+  
+	@GET
+	@Path("pcap/test")
+	@Produces("text/html")
+	public Response  index() throws URISyntaxException { 
+		return Response.ok("ALL GOOD").build();   
+	}
+	
+	
+  public PcapsResponse getPcaps(List<String> keys, String lastRowKey,
+      long startTime, long endTime, boolean includeReverseTraffic,
+      boolean includeDuplicateLastRow, long maxResultSize) throws IOException {
+    Assert
+        .isTrue(
+            checkIfValidInput(keys, lastRowKey),
+            "No valid input. One of the value must be present from {keys, lastRowKey}");
+    LOGGER.info(" keys=" + keys.toString() + ";  lastRowKey="
+        + lastRowKey);
+
+    PcapsResponse pcapsResponse = new PcapsResponse();
+    // 1. Process partial response key
+    if (StringUtils.isNotEmpty(lastRowKey)) {
+      pcapsResponse = processKey(pcapsResponse, lastRowKey, startTime,
+          endTime, true, includeDuplicateLastRow, maxResultSize);
+      // LOGGER.debug("after scanning lastRowKey=" +
+      // pcapsResponse.toString()+"*********************************************************************");
+      if (pcapsResponse.getStatus() == PcapsResponse.Status.PARTIAL) {
+        return pcapsResponse;
+      }
+    }
+    // 2. Process input keys
+    List<String> sortedKeys = sortKeysByAscOrder(keys, includeReverseTraffic);
+    List<String> unprocessedKeys = new ArrayList<String>();
+    unprocessedKeys.addAll(sortedKeys);
+    if (StringUtils.isNotEmpty(lastRowKey)) {
+      unprocessedKeys.clear();
+      unprocessedKeys = getUnprocessedSublistOfKeys(sortedKeys,
+          lastRowKey);
+    }
+    LOGGER.info("unprocessedKeys in getPcaps" + unprocessedKeys.toString());
+    if (!CollectionUtils.isEmpty(unprocessedKeys)) {
+      for (int i = 0; i < unprocessedKeys.size(); i++) {
+        pcapsResponse = processKey(pcapsResponse, unprocessedKeys.get(i),
+            startTime, endTime, false, includeDuplicateLastRow, maxResultSize);
+        // LOGGER.debug("after scanning input unprocessedKeys.get(" + i + ") ="
+        // +
+        // pcapsResponse.toString()+"*********************************************************************");
+        if (pcapsResponse.getStatus() == PcapsResponse.Status.PARTIAL) {
+          return pcapsResponse;
+        }
+      }
+    }
+    return pcapsResponse;
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see com.cisco.opensoc.hbase.client.IPcapGetter#getPcaps(java.lang.String, long,
+   * long, boolean)
+   */
+ 
+  public PcapsResponse getPcaps(String key, long startTime, long endTime,
+      boolean includeReverseTraffic) throws IOException {
+    Assert.hasText(key, "key must not be null or empty");
+    return getPcaps(Arrays.asList(key), null, startTime, endTime,
+        includeReverseTraffic, false, ConfigurationUtil.getDefaultResultSize());
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see com.cisco.opensoc.hbase.client.IPcapGetter#getPcaps(java.util.List)
+   */
+ 
+  public PcapsResponse getPcaps(List<String> keys) throws IOException {
+    Assert.notEmpty(keys, "'keys' must not be null or empty");
+    return getPcaps(keys, null, -1, -1,
+        ConfigurationUtil.isDefaultIncludeReverseTraffic(), false,
+        ConfigurationUtil.getDefaultResultSize());
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see com.cisco.opensoc.hbase.client.IPcapGetter#getPcaps(java.lang.String)
+   */
+ 
+  public PcapsResponse getPcaps(String key) throws IOException {
+    Assert.hasText(key, "key must not be null or empty");
+    return getPcaps(Arrays.asList(key), null, -1, -1,
+        ConfigurationUtil.isDefaultIncludeReverseTraffic(), false,
+        ConfigurationUtil.getDefaultResultSize());
+  }
+
+  /**
+   * Always returns the singleton instance.
+   * 
+   * @return IPcapGetter singleton instance
+   * @throws IOException
+   *           Signals that an I/O exception has occurred.
+   */
+  public static IPcapGetter getInstance() throws IOException {
+    if (pcapGetterHBase == null) {
+      synchronized (PcapGetterHBaseImpl.class) {
+        if (pcapGetterHBase == null) {
+          pcapGetterHBase = new PcapGetterHBaseImpl();
+        }
+      }
+    }
+    return pcapGetterHBase;
+  }
+
+  /**
+   * Instantiates a new pcap getter h base impl.
+   */
+  private PcapGetterHBaseImpl() {
+  }
+
+  /**
+   * Adds reverse keys to the list if the flag 'includeReverseTraffic' is set to
+   * true; removes duplicates and sorts the list by ascending order;.
+   * 
+   * @param keys
+   *          input keys
+   * @param includeReverseTraffic
+   *          flag whether or not to include reverse traffic
+   * @return List<String>
+   */
+  @VisibleForTesting
+  List<String> sortKeysByAscOrder(List<String> keys,
+      boolean includeReverseTraffic) {
+    Assert.notEmpty(keys, "'keys' must not be null");
+    if (includeReverseTraffic) {
+      keys.addAll(PcapHelper.reverseKey(keys));
+    }
+    List<String> deDupKeys = removeDuplicateKeys(keys);
+    Collections.sort(deDupKeys);
+    return deDupKeys;
+  }
+
+  /**
+   * Removes the duplicate keys.
+   * 
+   * @param keys
+   *          the keys
+   * @return the list
+   */
+  @VisibleForTesting
+public
+  List<String> removeDuplicateKeys(List<String> keys) {
+    Set<String> set = new HashSet<String>(keys);
+    return new ArrayList<String>(set);
+  }
+
+  /**
+   * <p>
+   * Returns the sublist starting from the element after the lastRowKey
+   * to the last element in the list; if the 'lastRowKey' is not matched
+   * the complete list will be returned.
+   * </p>
+   * 
+   * <pre>
+   * Eg :
+   *  keys = [18800006-1800000b-06-0019-caac, 18800006-1800000b-06-0050-5af6, 18800006-1800000b-11-0035-3810]
+   *  lastRowKey = "18800006-1800000b-06-0019-caac-65140-40815"
+   *  and the response from this method [18800006-1800000b-06-0050-5af6, 18800006-1800000b-11-0035-3810]
+   * </pre>
+   * 
+   * @param keys
+   *          keys
+   * @param lastRowKey
+   *          last row key of the previous partial response
+   * @return List<String>
+   */
+  @VisibleForTesting
+  List<String> getUnprocessedSublistOfKeys(List<String> keys,
+      String lastRowKey) {
+    Assert.notEmpty(keys, "'keys' must not be null");
+    Assert.hasText(lastRowKey, "'lastRowKey' must not be null");
+    String partialKey = getTokens(lastRowKey, 5);
+    int startIndex = 0;
+    for (int i = 0; i < keys.size(); i++) {
+      if (partialKey.equals(keys.get(i))) {
+        startIndex = i + 1;
+        break;
+      }
+    }
+    List<String> unprocessedKeys = keys.subList(startIndex, keys.size());
+    return unprocessedKeys;
+  }
+
+  /**
+   * Returns the first 'noOfTokens' tokens from the given key; token delimiter
+   * "-";.
+   * 
+   * @param key
+   *          given key
+   * @param noOfTokens
+   *          number of tokens to retrieve
+   * @return the tokens
+   */
+  @VisibleForTesting
+  String getTokens(String key, int noOfTokens) {
+    String delimeter = HBaseConfigConstants.PCAP_KEY_DELIMETER;
+    String regex = "\\" + delimeter;
+    String[] keyTokens = key.split(regex);
+    Assert.isTrue(noOfTokens < keyTokens.length,
+        "Invalid value for 'noOfTokens'");
+    StringBuffer sbf = new StringBuffer();
+    for (int i = 0; i < noOfTokens; i++) {
+      sbf.append(keyTokens[i]);
+      if (i != (noOfTokens - 1)) {
+        sbf.append(HBaseConfigConstants.PCAP_KEY_DELIMETER);
+      }
+
+    }
+    return sbf.toString();
+  }
+
+  /**
+   * Process key.
+   * 
+   * @param pcapsResponse
+   *          the pcaps response
+   * @param key
+   *          the key
+   * @param startTime
+   *          the start time
+   * @param endTime
+   *          the end time
+   * @param isPartialResponse
+   *          the is partial response
+   * @param includeDuplicateLastRow
+   *          the include duplicate last row
+   * @param maxResultSize
+   *          the max result size
+   * @return the pcaps response
+   * @throws IOException
+   *           Signals that an I/O exception has occurred.
+   */
+  @VisibleForTesting
+  PcapsResponse processKey(PcapsResponse pcapsResponse, String key,
+      long startTime, long endTime, boolean isPartialResponse,
+      boolean includeDuplicateLastRow, long maxResultSize) throws IOException {
+    HTable table = null;
+    Scan scan = null;
+    List<Cell> scannedCells = null;
+    try {
+      // 1. Create start and stop row for the key;
+      Map<String, String> keysMap = createStartAndStopRowKeys(key,
+          isPartialResponse, includeDuplicateLastRow);
+
+      // 2. if the input key contains all fragments (7) and it is not part
+      // of previous partial response (isPartialResponse),
+      // 'keysMap' will be null; do a Get; currently not doing any
+      // response size related checks for Get;
+      // by default all cells from a specific row are sorted by timestamp
+      if (keysMap == null) {
+        Get get = createGetRequest(key, startTime, endTime);
+        List<Cell> cells = executeGetRequest(table, get);
+        for (Cell cell : cells) {
+          pcapsResponse.addPcaps(CellUtil.cloneValue(cell));
+        }
+        return pcapsResponse;
+      }
+      // 3. Create and execute Scan request
+      scan = createScanRequest(pcapsResponse, keysMap, startTime, endTime,
+          maxResultSize);
+      scannedCells = executeScanRequest(table, scan);
+      LOGGER.info("scannedCells size :" + scannedCells.size());
+      addToResponse(pcapsResponse, scannedCells, maxResultSize);
+
+    } catch (IOException e) {
+      LOGGER.error("Exception occurred while fetching Pcaps for the keys :"
+          + key, e);
+      if (e instanceof ZooKeeperConnectionException
+          || e instanceof MasterNotRunningException
+          || e instanceof NoServerForRegionException) {
+        int maxRetryLimit = ConfigurationUtil.getConnectionRetryLimit();
+        System.out.println("maxRetryLimit =" + maxRetryLimit);
+        for (int attempt = 1; attempt <= maxRetryLimit; attempt++) {
+          System.out.println("attempting  =" + attempt);
+          try {
+            HBaseConfigurationUtil.closeConnection(); // closing the
+            // existing
+            // connection
+            // and retry,
+            // it will
+            // create a new
+            // HConnection
+            scannedCells = executeScanRequest(table, scan);
+            addToResponse(pcapsResponse, scannedCells, maxResultSize);
+            break;
+          } catch (IOException ie) {
+            if (attempt == maxRetryLimit) {
+              LOGGER.error("Throwing the exception after retrying "
+                  + maxRetryLimit + " times.");
+              throw e;
+            }
+          }
+        }
+      }
+
+    } finally {
+      if (table != null) {
+        table.close();
+      }
+    }
+    return pcapsResponse;
+  }
+
+  /**
+   * Adds the to response.
+   * 
+   * @param pcapsResponse
+   *          the pcaps response
+   * @param scannedCells
+   *          the scanned cells
+   * @param maxResultSize
+   *          the max result size
+   */
+  private void addToResponse(PcapsResponse pcapsResponse,
+      List<Cell> scannedCells, long maxResultSize) {
+    String lastKeyFromCurrentScan = null;
+    if (scannedCells != null && scannedCells.size() > 0) {
+      lastKeyFromCurrentScan = new String(CellUtil.cloneRow(scannedCells
+          .get(scannedCells.size() - 1)));
+    }
+    // 4. calculate the response size
+    Collections.sort(scannedCells, PcapHelper.getCellTimestampComparator());
+    for (Cell sortedCell : scannedCells) {
+      pcapsResponse.addPcaps(CellUtil.cloneValue(sortedCell));
+    }
+    if (!pcapsResponse.isResonseSizeWithinLimit(maxResultSize)) {
+      pcapsResponse.setStatus(PcapsResponse.Status.PARTIAL); // response size
+                                                             // reached
+      pcapsResponse.setLastRowKey(new String(lastKeyFromCurrentScan));
+    }
+  }
+
+  /**
+   * Builds start and stop row keys according to the following logic : 1.
+   * Creates tokens out of 'key' using pcap_id delimiter ('-') 2. if the input
+   * 'key' contains (assume : configuredTokensInRowKey=7 and
+   * minimumTokensIninputKey=5): a). 5 tokens
+   * ("srcIp-dstIp-protocol-srcPort-dstPort") startKey =
+   * "srcIp-dstIp-protocol-srcPort-dstPort-00000-00000" stopKey =
+   * "srcIp-dstIp-protocol-srcPort-dstPort-99999-99999" b). 6 tokens
+   * ("srcIp-dstIp-protocol-srcPort-dstPort-id1") startKey =
+   * "srcIp-dstIp-protocol-srcPort-dstPort-id1-00000" stopKey =
+   * "srcIp-dstIp-protocol-srcPort-dstPort-id1-99999"
+   * 
+   * c). 7 tokens ("srcIp-dstIp-protocol-srcPort-dstPort-id1-id2") 1>. if the
+   * key is NOT part of the partial response from previous request, return
+   * 'null' 2>. if the key is part of partial response from previous request
+   * startKey = "srcIp-dstIp-protocol-srcPort-dstPort-id1-(id2+1)"; 1 is added
+   * to exclude this key as it was included in the previous request stopKey =
+   * "srcIp-dstIp-protocol-srcPort-dstPort-99999-99999"
+   * 
+   * @param key
+   *          the key
+   * @param isLastRowKey
+   *          if the key is part of partial response
+   * @param includeDuplicateLastRow
+   *          the include duplicate last row
+   * @return Map<String, String>
+   */
+  @VisibleForTesting
+  Map<String, String> createStartAndStopRowKeys(String key,
+      boolean isLastRowKey, boolean includeDuplicateLastRow) {
+    String delimeter = HBaseConfigConstants.PCAP_KEY_DELIMETER;
+    String regex = "\\" + delimeter;
+    String[] keyTokens = key.split(regex);
+
+    String startKey = null;
+    String endKey = null;
+    Map<String, String> map = new HashMap<String, String>();
+
+    int configuredTokensInRowKey = ConfigurationUtil
+        .getConfiguredTokensInRowkey();
+    int minimumTokensIninputKey = ConfigurationUtil
+        .getMinimumTokensInInputkey();
+    Assert
+        .isTrue(
+            minimumTokensIninputKey <= configuredTokensInRowKey,
+            "tokens in the input key (separated by '-'), must be less than or equal to the tokens used in hbase table row key ");
+    // in case if the input key contains 'configuredTokensInRowKey' tokens and
+    // it is NOT a
+    // partial response key, do a Get instead of Scan
+    if (keyTokens.length == configuredTokensInRowKey) {
+      if (!isLastRowKey) {
+        return null;
+      }
+      // it is a partial response key; 'startKey' is same as input partial
+      // response key; 'endKey' can be built by replacing
+      // (configuredTokensInRowKey - minimumTokensIninputKey) tokens
+      // of input partial response key with '99999'
+      if (keyTokens.length == minimumTokensIninputKey) {
+        return null;
+      }
+      int appendingTokenSlots = configuredTokensInRowKey
+          - minimumTokensIninputKey;
+      if (appendingTokenSlots > 0) {
+        String partialKey = getTokens(key, minimumTokensIninputKey);
+        StringBuffer sbfStartNew = new StringBuffer(partialKey);
+        StringBuffer sbfEndNew = new StringBuffer(partialKey);
+        for (int i = 0; i < appendingTokenSlots; i++) {
+          if (i == (appendingTokenSlots - 1)) {
+            if (!includeDuplicateLastRow) {
+              sbfStartNew
+                  .append(HBaseConfigConstants.PCAP_KEY_DELIMETER)
+                  .append(
+                      Integer.valueOf(keyTokens[minimumTokensIninputKey + i]) + 1);
+            } else {
+              sbfStartNew.append(HBaseConfigConstants.PCAP_KEY_DELIMETER)
+                  .append(keyTokens[minimumTokensIninputKey + i]);
+            }
+          } else {
+            sbfStartNew.append(HBaseConfigConstants.PCAP_KEY_DELIMETER).append(
+                keyTokens[minimumTokensIninputKey + i]);
+          }
+          sbfEndNew.append(HBaseConfigConstants.PCAP_KEY_DELIMETER).append(
+              getMaxLimitForAppendingTokens());
+        }
+        startKey = sbfStartNew.toString();
+        endKey = sbfEndNew.toString();
+      }
+    } else {
+      StringBuffer sbfStart = new StringBuffer(key);
+      StringBuffer sbfEnd = new StringBuffer(key);
+      for (int i = keyTokens.length; i < configuredTokensInRowKey; i++) {
+        sbfStart.append(HBaseConfigConstants.PCAP_KEY_DELIMETER).append(
+            getMinLimitForAppendingTokens());
+        sbfEnd.append(HBaseConfigConstants.PCAP_KEY_DELIMETER).append(
+            getMaxLimitForAppendingTokens());
+      }
+      startKey = sbfStart.toString();
+      endKey = sbfEnd.toString();
+    }
+    map.put(HBaseConfigConstants.START_KEY, startKey);
+    map.put(HBaseConfigConstants.END_KEY, endKey);
+
+    return map;
+  }
+
+  /**
+   * Returns false if keys is empty or null AND lastRowKey is null or
+   * empty; otherwise returns true;.
+   * 
+   * @param keys
+   *          input row keys
+   * @param lastRowKey
+   *          partial response key
+   * @return boolean
+   */
+  @VisibleForTesting
+  boolean checkIfValidInput(List<String> keys, String lastRowKey) {
+    if (CollectionUtils.isEmpty(keys)
+        && StringUtils.isEmpty(lastRowKey)) {
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Executes the given Get request.
+   * 
+   * @param table
+   *          hbase table
+   * @param get
+   *          Get
+   * @return List<Cell>
+   * @throws IOException
+   *           Signals that an I/O exception has occurred.
+   */
+  private List<Cell> executeGetRequest(HTable table, Get get)
+      throws IOException {
+    LOGGER.info("Get :" + get.toString());
+    table = (HTable) HBaseConfigurationUtil.getConnection().getTable(
+        ConfigurationUtil.getTableName());
+    Result result = table.get(get);
+    List<Cell> cells = result.getColumnCells(
+        ConfigurationUtil.getColumnFamily(),
+        ConfigurationUtil.getColumnQualifier());
+    return cells;
+  }
+
+  /**
+   * Execute scan request.
+   * 
+   * @param table
+   *          hbase table
+   * @param scan
+   *          the scan
+   * @return the list
+   * @throws IOException
+   *           Signals that an I/O exception has occurred.
+   */
+  private List<Cell> executeScanRequest(HTable table, Scan scan)
+      throws IOException {
+    LOGGER.info("Scan :" + scan.toString());
+    table = (HTable) HBaseConfigurationUtil.getConnection().getTable(
+    		ConfigurationUtil.getConfiguration().getString("hbase.table.name"));
+    ResultScanner resultScanner = table.getScanner(scan);
+    List<Cell> scannedCells = new ArrayList<Cell>();
+    for (Result result = resultScanner.next(); result != null; result = resultScanner
+        .next()) {
+      List<Cell> cells = result.getColumnCells(
+          ConfigurationUtil.getColumnFamily(),
+          ConfigurationUtil.getColumnQualifier());
+      if (cells != null) {
+        for (Cell cell : cells) {
+          scannedCells.add(cell);
+        }
+      }
+    }
+    return scannedCells;
+  }
+
+  /**
+   * Creates the get request.
+   * 
+   * @param key
+   *          the key
+   * @param startTime
+   *          the start time
+   * @param endTime
+   *          the end time
+   * @return the gets the
+   * @throws IOException
+   *           Signals that an I/O exception has occurred.
+   */
+  @VisibleForTesting
+  Get createGetRequest(String key, long startTime, long endTime)
+      throws IOException {
+    Get get = new Get(Bytes.toBytes(key));
+    // set family name
+    get.addFamily(ConfigurationUtil.getColumnFamily());
+
+    // set column family, qualifier
+    get.addColumn(ConfigurationUtil.getColumnFamily(),
+        ConfigurationUtil.getColumnQualifier());
+
+    // set max versions
+    get.setMaxVersions(ConfigurationUtil.getMaxVersions());
+
+    // set time range
+    setTimeRangeOnGet(get, startTime, endTime);
+    return get;
+  }
+
+  /**
+   * Creates the scan request.
+   * 
+   * @param pcapsResponse
+   *          the pcaps response
+   * @param keysMap
+   *          the keys map
+   * @param startTime
+   *          the start time
+   * @param endTime
+   *          the end time
+   * @param maxResultSize
+   *          the max result size
+   * @return the scan
+   * @throws IOException
+   *           Signals that an I/O exception has occurred.
+   */
+  @VisibleForTesting
+  Scan createScanRequest(PcapsResponse pcapsResponse,
+      Map<String, String> keysMap, long startTime, long endTime,
+      long maxResultSize) throws IOException {
+    Scan scan = new Scan();
+    // set column family, qualifier
+    scan.addColumn(ConfigurationUtil.getColumnFamily(),
+        ConfigurationUtil.getColumnQualifier());
+
+    // set start and stop keys
+    scan.setStartRow(keysMap.get(HBaseConfigConstants.START_KEY).getBytes());
+    scan.setStopRow(keysMap.get(HBaseConfigConstants.END_KEY).getBytes());
+
+    // set max results size : remaining size = max results size - ( current
+    // pcaps response size + possible maximum row size)
+    long remainingSize = maxResultSize
+        - (pcapsResponse.getResponseSize() + ConfigurationUtil.getMaxRowSize());
+
+    if (remainingSize > 0) {
+      scan.setMaxResultSize(remainingSize);
+    }
+    // set max versions
+    scan.setMaxVersions(ConfigurationUtil.getConfiguration().getInt(
+        "hbase.table.column.maxVersions"));
+
+    // set time range
+    setTimeRangeOnScan(scan, startTime, endTime);
+    return scan;
+  }
+
+  /**
+   * Sets the time range on scan.
+   * 
+   * @param scan
+   *          the scan
+   * @param startTime
+   *          the start time
+   * @param endTime
+   *          the end time
+   * @throws IOException
+   *           Signals that an I/O exception has occurred.
+   */
+  private void setTimeRangeOnScan(Scan scan, long startTime, long endTime)
+      throws IOException {
+    boolean setTimeRange = true;
+    if (startTime < 0 && endTime < 0) {
+      setTimeRange = false;
+    }
+    if (setTimeRange) {
+      if (startTime < 0) {
+        startTime = 0;
+      } else {
+        startTime = PcapHelper.convertToDataCreationTimeUnit(startTime);
+      }
+      if (endTime < 0) {
+        endTime = Long.MAX_VALUE;
+      } else {
+        endTime = PcapHelper.convertToDataCreationTimeUnit(endTime);
+      }
+      Assert.isTrue(startTime < endTime,
+          "startTime value must be less than endTime value");
+      scan.setTimeRange(startTime, endTime);
+    }
+  }
+
+  /**
+   * Sets the time range on get.
+   * 
+   * @param get
+   *          the get
+   * @param startTime
+   *          the start time
+   * @param endTime
+   *          the end time
+   * @throws IOException
+   *           Signals that an I/O exception has occurred.
+   */
+  private void setTimeRangeOnGet(Get get, long startTime, long endTime)
+      throws IOException {
+    boolean setTimeRange = true;
+    if (startTime < 0 && endTime < 0) {
+      setTimeRange = false;
+    }
+    if (setTimeRange) {
+      if (startTime < 0) {
+        startTime = 0;
+      } else {
+        startTime = PcapHelper.convertToDataCreationTimeUnit(startTime);
+      }
+      if (endTime < 0) {
+        endTime = Long.MAX_VALUE;
+      } else {
+        endTime = PcapHelper.convertToDataCreationTimeUnit(endTime);
+      }
+      Assert.isTrue(startTime < endTime,
+          "startTime value must be less than endTime value");
+      get.setTimeRange(startTime, endTime);
+    }
+  }
+
+  /**
+   * Gets the min limit for appending tokens.
+   * 
+   * @return the min limit for appending tokens
+   */
+  private String getMinLimitForAppendingTokens() {
+    int digits = ConfigurationUtil.getAppendingTokenDigits();
+    StringBuffer sbf = new StringBuffer();
+    for (int i = 0; i < digits; i++) {
+      sbf.append("0");
+    }
+    return sbf.toString();
+  }
+
+  /**
+   * Gets the max limit for appending tokens.
+   * 
+   * @return the max limit for appending tokens
+   */
+  private String getMaxLimitForAppendingTokens() {
+    int digits = ConfigurationUtil.getAppendingTokenDigits();
+    StringBuffer sbf = new StringBuffer();
+    for (int i = 0; i < digits; i++) {
+      sbf.append("9");
+    }
+    return sbf.toString();
+  }
+
+  /**
+   * The main method.
+   * 
+   * @param args
+   *          the arguments
+   * 
+   * @throws IOException
+   *           Signals that an I/O exception has occurred.
+   */
+  public static void main(String[] args) throws IOException {
+    if (args == null || args.length < 2) {
+      usage();
+      return;
+    }
+    String outputFileName = null;
+    outputFileName = args[1];
+    List<String> keys = Arrays.asList(StringUtils.split(args[2], ","));
+    System.out.println("Geting keys " + keys);
+    long startTime = 0;
+    long endTime = Long.MAX_VALUE;
+    if (args.length > 3) {
+      startTime = Long.valueOf(args[3]);
+    }
+    if (args.length > 4) {
+      endTime = Long.valueOf(args[4]);
+    }
+    System.out.println("With start time " + startTime + " and end time "
+        + endTime);
+    PcapGetterHBaseImpl downloader = new PcapGetterHBaseImpl();
+    PcapsResponse pcaps = downloader.getPcaps(keys, null, startTime, endTime,
+        false, false, 6);
+    File file = new File(outputFileName);
+    FileUtils.write(file, "", false);
+    FileUtils.writeByteArrayToFile(file, pcaps.getPcaps(), true);
+  }
+
+  /**
+   * Usage.
+   */
+  private static void usage() {
+    System.out.println("java " + PcapGetterHBaseImpl.class.getName() // $codepro.audit.disable
+        // debuggingCode
+        + " <zk quorum> <output file> <start key> [stop key]");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-Pcap_Service/src/main/java/com/apache/metron/pcapservice/PcapHelper.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Pcap_Service/src/main/java/com/apache/metron/pcapservice/PcapHelper.java b/metron-streaming/Metron-Pcap_Service/src/main/java/com/apache/metron/pcapservice/PcapHelper.java
new file mode 100644
index 0000000..5224945
--- /dev/null
+++ b/metron-streaming/Metron-Pcap_Service/src/main/java/com/apache/metron/pcapservice/PcapHelper.java
@@ -0,0 +1,205 @@
+package com.opensoc.pcapservice;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.log4j.Logger;
+import org.mortbay.log.Log;
+import org.springframework.util.Assert;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * utility class which holds methods related to time conversions, building
+ * reverse keys.
+ */
+public class PcapHelper {
+
+  /** The Constant LOGGER. */
+  private static final Logger LOGGER = Logger.getLogger(PcapHelper.class);
+
+  /** The cell timestamp comparator. */
+  private static CellTimestampComparator CELL_TIMESTAMP_COMPARATOR = new CellTimestampComparator();
+
+  /**
+   * The Enum TimeUnit.
+   */
+  public enum TimeUnit {
+
+    /** The seconds. */
+    SECONDS,
+    /** The millis. */
+    MILLIS,
+    /** The micros. */
+    MICROS,
+    /** The unknown. */
+    UNKNOWN
+  };
+
+  /**
+   * Converts the given time to the 'hbase' data creation time unit.
+   * 
+   * @param inputTime
+   *          the input time
+   * @return the long
+   */
+  public static long convertToDataCreationTimeUnit(long inputTime) {
+    if (inputTime <= 9999999999L) {
+      return convertSecondsToDataCreationTimeUnit(inputTime); // input time unit
+                                                              // is in seconds
+    } else if (inputTime <= 9999999999999L) {
+      return convertMillisToDataCreationTimeUnit(inputTime); // input time unit
+                                                             // is in millis
+    } else if (inputTime <= 9999999999999999L) {
+      return convertMicrosToDataCreationTimeUnit(inputTime); // input time unit
+                                                             // it in micros
+    }
+    return inputTime; // input time unit is unknown
+  }
+
+  /**
+   * Returns the 'hbase' data creation time unit by reading
+   * 'hbase.table.data.time.unit' property in 'hbase-config' properties file; If
+   * none is mentioned in properties file, returns <code>TimeUnit.UNKNOWN</code>
+   * 
+   * @return TimeUnit
+   */
+  @VisibleForTesting
+  public static TimeUnit getDataCreationTimeUnit() {
+    String timeUnit = ConfigurationUtil.getConfiguration().getString(
+        "hbase.table.data.time.unit");
+    LOGGER.debug("hbase.table.data.time.unit=" + timeUnit.toString());
+    if (StringUtils.isNotEmpty(timeUnit)) {
+      return TimeUnit.valueOf(timeUnit);
+    }
+    return TimeUnit.UNKNOWN;
+  }
+
+  /**
+   * Convert seconds to data creation time unit.
+   * 
+   * @param inputTime
+   *          the input time
+   * @return the long
+   */
+  @VisibleForTesting
+  public static long convertSecondsToDataCreationTimeUnit(long inputTime) {
+    System.out.println("convert Seconds To DataCreation TimeUnit");
+    TimeUnit dataCreationTimeUnit = getDataCreationTimeUnit();
+    if (TimeUnit.SECONDS == dataCreationTimeUnit) {
+      return inputTime;
+    } else if (TimeUnit.MILLIS == dataCreationTimeUnit) {
+      return inputTime * 1000;
+    } else if (TimeUnit.MICROS == dataCreationTimeUnit) {
+      return inputTime * 1000 * 1000;
+    }
+    return inputTime;
+  }
+
+  /**
+   * Builds the reverseKey to fetch the pcaps in the reverse traffic
+   * (destination to source).
+   * 
+   * @param key
+   *          indicates hbase rowKey (partial or full) in the format
+   *          "srcAddr-dstAddr-protocol-srcPort-dstPort-fragment"
+   * @return String indicates the key in the format
+   *         "dstAddr-srcAddr-protocol-dstPort-srcPort"
+   */
+  public static String reverseKey(String key) {
+    Assert.hasText(key, "key must not be null or empty");
+    String delimeter = HBaseConfigConstants.PCAP_KEY_DELIMETER;
+    String regex = "\\" + delimeter;
+    StringBuffer sb = new StringBuffer();
+    try {
+      String[] tokens = key.split(regex);
+      Assert
+          .isTrue(
+              (tokens.length == 5 || tokens.length == 6 || tokens.length == 7),
+              "key is not in the format : 'srcAddr-dstAddr-protocol-srcPort-dstPort-{ipId-fragment identifier}'");
+      sb.append(tokens[1]).append(delimeter).append(tokens[0])
+          .append(delimeter).append(tokens[2]).append(delimeter)
+          .append(tokens[4]).append(delimeter).append(tokens[3]);
+    } catch (Exception e) {
+      Log.warn("Failed to reverse the key. Reverse scan won't be performed.", e);
+    }
+    return sb.toString();
+  }
+
+  /**
+   * Builds the reverseKeys to fetch the pcaps in the reverse traffic
+   * (destination to source). If all keys in the input are not in the expected
+   * format, it returns an empty list;
+   * 
+   * @param keys
+   *          indicates list of hbase rowKeys (partial or full) in the format
+   *          "srcAddr-dstAddr-protocol-srcPort-dstPort-fragment"
+   * @return List<String> indicates the list of keys in the format
+   *         "dstAddr-srcAddr-protocol-dstPort-srcPort"
+   */
+  public static List<String> reverseKey(List<String> keys) {
+    Assert.notEmpty(keys, "'keys' must not be null or empty");
+    List<String> reverseKeys = new ArrayList<String>();
+    for (String key : keys) {
+      if (key != null) {
+        String reverseKey = reverseKey(key);
+        if (StringUtils.isNotEmpty(reverseKey)) {
+          reverseKeys.add(reverseKey);
+        }
+      }
+    }
+    return reverseKeys;
+  }
+
+  /**
+   * Returns Comparator for sorting pcaps cells based on the timestamp (dsc).
+   * 
+   * @return CellTimestampComparator
+   */
+  public static CellTimestampComparator getCellTimestampComparator() {
+    return CELL_TIMESTAMP_COMPARATOR;
+  }
+
+  /**
+   * Convert millis to data creation time unit.
+   * 
+   * @param inputTime
+   *          the input time
+   * @return the long
+   */
+  @VisibleForTesting
+  private static long convertMillisToDataCreationTimeUnit(long inputTime) {
+    System.out.println("convert Millis To DataCreation TimeUnit");
+    TimeUnit dataCreationTimeUnit = getDataCreationTimeUnit();
+    if (TimeUnit.SECONDS == dataCreationTimeUnit) {
+      return (inputTime / 1000);
+    } else if (TimeUnit.MILLIS == dataCreationTimeUnit) {
+      return inputTime;
+    } else if (TimeUnit.MICROS == dataCreationTimeUnit) {
+      return inputTime * 1000;
+    }
+    return inputTime;
+  }
+
+  /**
+   * Convert micros to data creation time unit.
+   * 
+   * @param inputTime
+   *          the input time
+   * @return the long
+   */
+  @VisibleForTesting
+  private static long convertMicrosToDataCreationTimeUnit(long inputTime) {
+    System.out.println("convert Micros To DataCreation TimeUnit");
+    TimeUnit dataCreationTimeUnit = getDataCreationTimeUnit();
+    if (TimeUnit.SECONDS == dataCreationTimeUnit) {
+      return inputTime / (1000 * 1000);
+    } else if (TimeUnit.MILLIS == dataCreationTimeUnit) {
+      return inputTime / 1000;
+    } else if (TimeUnit.MICROS == dataCreationTimeUnit) {
+      return inputTime;
+    }
+    return inputTime;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-Pcap_Service/src/main/java/com/apache/metron/pcapservice/PcapReceiverImplRestEasy.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Pcap_Service/src/main/java/com/apache/metron/pcapservice/PcapReceiverImplRestEasy.java b/metron-streaming/Metron-Pcap_Service/src/main/java/com/apache/metron/pcapservice/PcapReceiverImplRestEasy.java
new file mode 100644
index 0000000..98e855e
--- /dev/null
+++ b/metron-streaming/Metron-Pcap_Service/src/main/java/com/apache/metron/pcapservice/PcapReceiverImplRestEasy.java
@@ -0,0 +1,250 @@
+package com.opensoc.pcapservice;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.log4j.Logger;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.opensoc.pcap.PcapUtils;
+
+@Path("/")
+public class PcapReceiverImplRestEasy {
+
+	/** The Constant LOGGER. */
+	private static final Logger LOGGER = Logger
+			.getLogger(PcapReceiverImplRestEasy.class);
+
+	/** The Constant HEADER_CONTENT_DISPOSITION_NAME. */
+	private static final String HEADER_CONTENT_DISPOSITION_NAME = "Content-Disposition";
+
+	/** The Constant HEADER_CONTENT_DISPOSITION_VALUE. */
+	private static final String HEADER_CONTENT_DISPOSITION_VALUE = "attachment; filename=\"managed-threat.pcap\"";
+
+	/** partial response key header name. */
+	private static final String HEADER_PARTIAL_RESPONE_KEY = "lastRowKey";
+
+	@GET
+	@Path("pcapGetter/getPcapsByKeys")
+	public Response getPcapsByKeys(
+			@QueryParam("keys") List<String> keys,
+			@QueryParam("lastRowKey") String lastRowKey,
+			@DefaultValue("-1") @QueryParam("startTime") long startTime,
+			@DefaultValue("-1") @QueryParam("endTime") long endTime,
+			@QueryParam("includeDuplicateLastRow") boolean includeDuplicateLastRow,
+			@QueryParam("includeReverseTraffic") boolean includeReverseTraffic,
+			@QueryParam("maxResponseSize") String maxResponseSize,
+			@Context HttpServletResponse response) throws IOException {
+		PcapsResponse pcapResponse = null;
+
+		if (keys == null || keys.size() == 0)
+			return Response.serverError().status(Response.Status.NO_CONTENT)
+					.entity("'keys' must not be null or empty").build();
+
+		try {
+			IPcapGetter pcapGetter = PcapGetterHBaseImpl.getInstance();
+			pcapResponse = pcapGetter.getPcaps(parseKeys(keys), lastRowKey,
+					startTime, endTime, includeReverseTraffic,
+					includeDuplicateLastRow,
+					ConfigurationUtil.validateMaxResultSize(maxResponseSize));
+			LOGGER.info("pcaps response in REST layer ="
+					+ pcapResponse.toString());
+
+			// return http status '204 No Content' if the pcaps response size is
+			// 0
+			if (pcapResponse == null || pcapResponse.getResponseSize() == 0) {
+
+				return Response.status(Response.Status.NO_CONTENT).build();
+			}
+
+			// return http status '206 Partial Content', the partial response
+			// file and
+			// 'lastRowKey' header , if the pcaps response status is 'PARTIAL'
+
+			response.setHeader(HEADER_CONTENT_DISPOSITION_NAME,
+					HEADER_CONTENT_DISPOSITION_VALUE);
+
+			if (pcapResponse.getStatus() == PcapsResponse.Status.PARTIAL) {
+
+				response.setHeader(HEADER_PARTIAL_RESPONE_KEY,
+						pcapResponse.getLastRowKey());
+
+				return Response
+						.ok(pcapResponse.getPcaps(),
+								MediaType.APPLICATION_OCTET_STREAM).status(206)
+						.build();
+
+			}
+
+		} catch (IOException e) {
+			LOGGER.error(
+					"Exception occurred while fetching Pcaps for the keys :"
+							+ keys.toString(), e);
+			throw e;
+		}
+
+		// return http status '200 OK' along with the complete pcaps response
+		// file,
+		// and headers
+		// return new ResponseEntity<byte[]>(pcapResponse.getPcaps(), headers,
+		// HttpStatus.OK);
+
+		return Response
+				.ok(pcapResponse.getPcaps(), MediaType.APPLICATION_OCTET_STREAM)
+				.status(200).build();
+
+	}
+	
+	
+	@GET
+	@Path("/pcapGetter/getPcapsByKeyRange")
+
+	  public Response getPcapsByKeyRange(
+	      @QueryParam("startKey") String startKey,
+	      @QueryParam("endKey")String endKey,
+	      @QueryParam("maxResponseSize") String maxResponseSize,
+	      @DefaultValue("-1") @QueryParam("startTime")long startTime,
+	      @DefaultValue("-1") @QueryParam("endTime") long endTime, 
+	      @Context HttpServletResponse servlet_response) throws IOException {
+
+		if (startKey == null || startKey.equals(""))
+			return Response.serverError().status(Response.Status.NO_CONTENT)
+					.entity("'start key' must not be null or empty").build();
+		
+		if (startKey == null || startKey.equals(""))
+			return Response.serverError().status(Response.Status.NO_CONTENT)
+					.entity("'end key' must not be null or empty").build();
+		
+		
+	    byte[] response = null;
+	    try {
+	      IPcapScanner pcapScanner = PcapScannerHBaseImpl.getInstance();
+	      response = pcapScanner.getPcaps(startKey, endKey,
+	          ConfigurationUtil.validateMaxResultSize(maxResponseSize), startTime,
+	          endTime);
+	      if (response == null || response.length == 0) {
+	    	  
+	    	  return Response.status(Response.Status.NO_CONTENT).build();
+	        
+	      }
+	      servlet_response.setHeader(HEADER_CONTENT_DISPOSITION_NAME,
+					HEADER_CONTENT_DISPOSITION_VALUE);
+
+	    } catch (IOException e) {
+	      LOGGER.error(
+	          "Exception occurred while fetching Pcaps for the key range : startKey="
+	              + startKey + ", endKey=" + endKey, e);
+	      throw e;
+	    }
+	    // return http status '200 OK' along with the complete pcaps response file,
+	    // and headers
+	    
+		return Response
+				.ok(response, MediaType.APPLICATION_OCTET_STREAM)
+				.status(200).build();
+	  }
+
+	  /*
+	   * (non-Javadoc)
+	   * 
+	   * @see
+	   * com.cisco.opensoc.hbase.client.IPcapReceiver#getPcapsByIdentifiers(java.lang
+	   * .String, java.lang.String, java.lang.String, java.lang.String,
+	   * java.lang.String, long, long, boolean,
+	   * javax.servlet.http.HttpServletResponse)
+	   */
+	  
+	@GET
+	@Path("/pcapGetter/getPcapsByIdentifiers")
+
+	  public Response getPcapsByIdentifiers(
+	      @QueryParam ("srcIp") String srcIp, 
+	      @QueryParam ("dstIp") String dstIp,
+	      @QueryParam ("protocol") String protocol, 
+	      @QueryParam ("srcPort") String srcPort,
+	      @QueryParam ("dstPort") String dstPort,
+	      @DefaultValue("-1") @QueryParam ("startTime")long startTime,
+	      @DefaultValue("-1") @QueryParam ("endTime")long endTime,
+	      @DefaultValue("false") @QueryParam ("includeReverseTraffic") boolean includeReverseTraffic,
+	      @Context HttpServletResponse servlet_response)
+	      
+	      throws IOException {
+		
+		if (srcIp == null || srcIp.equals(""))
+			return Response.serverError().status(Response.Status.NO_CONTENT)
+					.entity("'srcIp' must not be null or empty").build();
+		
+		if (dstIp == null || dstIp.equals(""))
+			return Response.serverError().status(Response.Status.NO_CONTENT)
+					.entity("'dstIp' must not be null or empty").build();
+		
+		if (protocol == null || protocol.equals(""))
+			return Response.serverError().status(Response.Status.NO_CONTENT)
+					.entity("'protocol' must not be null or empty").build();
+		
+		if (srcPort == null || srcPort.equals(""))
+			return Response.serverError().status(Response.Status.NO_CONTENT)
+					.entity("'srcPort' must not be null or empty").build();
+		
+		if (dstPort == null || dstPort.equals(""))
+			return Response.serverError().status(Response.Status.NO_CONTENT)
+					.entity("'dstPort' must not be null or empty").build();
+		
+	
+	    PcapsResponse response = null;
+	    try {
+	      String sessionKey = PcapUtils.getSessionKey(srcIp, dstIp, protocol,
+	          srcPort, dstPort);
+	      LOGGER.info("sessionKey =" + sessionKey);
+	      IPcapGetter pcapGetter = PcapGetterHBaseImpl.getInstance();
+	      response = pcapGetter.getPcaps(Arrays.asList(sessionKey), null,
+	          startTime, endTime, includeReverseTraffic, false,
+	          ConfigurationUtil.getDefaultResultSize());
+	      if (response == null || response.getResponseSize() == 0) {
+	         return Response.status(Response.Status.NO_CONTENT).build();
+	      }
+	      servlet_response.setHeader(HEADER_CONTENT_DISPOSITION_NAME,
+					HEADER_CONTENT_DISPOSITION_VALUE);
+
+	    } catch (IOException e) {
+	      LOGGER.error("Exception occurred while fetching Pcaps by identifiers :",
+	          e);
+	      throw e;
+	    }
+	    // return http status '200 OK' along with the complete pcaps response file,
+	    // and headers
+	    return Response
+				.ok(response.getPcaps(), MediaType.APPLICATION_OCTET_STREAM)
+				.status(200).build();
+	  }
+	/**
+	 * This method parses the each value in the List using delimiter ',' and
+	 * builds a new List;.
+	 * 
+	 * @param keys
+	 *            list of keys to be parsed
+	 * @return list of keys
+	 */
+	@VisibleForTesting
+	List<String> parseKeys(List<String> keys) {
+		// Assert.notEmpty(keys);
+		List<String> parsedKeys = new ArrayList<String>();
+		for (String key : keys) {
+			parsedKeys.addAll(Arrays.asList(StringUtils.split(
+					StringUtils.trim(key), ",")));
+		}
+		return parsedKeys;
+	}
+}