You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/04/29 18:28:35 UTC

[5/6] incubator-metron git commit: METRON-119 - Move PCAP infrastructure from HBase closes apache/incubator-metron#93

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/28c250d1/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/IPcapScanner.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/IPcapScanner.java b/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/IPcapScanner.java
deleted file mode 100644
index 4101328..0000000
--- a/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/IPcapScanner.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.pcapservice;
-
-import java.io.IOException;
-
-/**
- * The Interface for all pcaps fetching methods based on key range.
- */
-public interface IPcapScanner {
-
-  /**
-   * Gets the pcaps for between startKey (inclusive) and endKey (exclusive).
-   * 
-   * @param startKey
-   *          the start key of a key range for which pcaps is to be retrieved.
-   * @param endKey
-   *          the end key of a key range for which pcaps is to be retrieved.
-   * @param maxResponseSize
-   *          indicates the maximum response size in MegaBytes(MB). User needs
-   *          to pass positive value and must be less than 60 (MB)
-   * @param startTime
-   *          the start time in system milliseconds to be used to filter the
-   *          pcaps. The value is set to '0' if the caller sends negative value
-   * @param endTime
-   *          the end time in system milliseconds to be used to filter the
-   *          pcaps. The value is set Long.MAX_VALUE if the caller sends
-   *          negative value
-   * @return byte array with all matching pcaps merged together
-   * @throws IOException
-   *           Signals that an I/O exception has occurred.
-   */
-  public byte[] getPcaps(String startKey, String endKey, long maxResponseSize,
-      long startTime, long endTime) throws IOException;
-
-  /**
-   * Gets the pcaps for between startKey (inclusive) and endKey (exclusive).
-   * 
-   * @param startKey
-   *          the start key (inclusive) of a key range for which pcaps is to be
-   *          retrieved.
-   * @param endKey
-   *          the end key (exclusive) of a key range for which pcaps is to be
-   *          retrieved.
-   * @return byte array with all matching pcaps merged together
-   * @throws IOException
-   *           Signals that an I/O exception has occurred.
-   */
-  public byte[] getPcaps(String startKey, String endKey) throws IOException;
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/28c250d1/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapGetterHBaseImpl.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapGetterHBaseImpl.java b/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapGetterHBaseImpl.java
deleted file mode 100644
index 58fecb9..0000000
--- a/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapGetterHBaseImpl.java
+++ /dev/null
@@ -1,826 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.pcapservice;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import javax.annotation.Resource;
-import javax.ws.rs.GET;
-import javax.ws.rs.Path;
-import javax.ws.rs.Produces;
-import javax.ws.rs.core.Response;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.MasterNotRunningException;
-import org.apache.hadoop.hbase.ZooKeeperConnectionException;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.NoServerForRegionException;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.log4j.Logger;
-import org.springframework.util.Assert;
-import org.springframework.util.CollectionUtils;
-
-import com.google.common.annotations.VisibleForTesting;
-
-/**
- * Singleton class which integrates with HBase table and returns pcaps sorted by
- * timestamp(dsc) for the given list of keys. Creates HConnection if it is not
- * already created and the same connection instance is being used for all
- * requests
- * 
- * @author sheetal
- * @version $Revision: 1.0 $
- */
-
-@Path("/")
-public class PcapGetterHBaseImpl implements IPcapGetter {
-
-  /** The pcap getter h base. */
-  private static IPcapGetter pcapGetterHBase = null;
-
-  /** The Constant LOG. */
-  private static final Logger LOGGER = Logger
-      .getLogger(PcapGetterHBaseImpl.class);
-
-  /*
-   * (non-Javadoc)
-   * 
-   * @see com.cisco.opensoc.hbase.client.IPcapGetter#getPcaps(java.util.List,
-   * java.lang.String, long, long, boolean, boolean, long)
-   */
- 
-  
-	@GET
-	@Path("pcap/test")
-	@Produces("text/html")
-	public Response  index() throws URISyntaxException { 
-		return Response.ok("ALL GOOD").build();   
-	}
-	
-	
-  public PcapsResponse getPcaps(List<String> keys, String lastRowKey,
-      long startTime, long endTime, boolean includeReverseTraffic,
-      boolean includeDuplicateLastRow, long maxResultSize) throws IOException {
-    Assert
-        .isTrue(
-            checkIfValidInput(keys, lastRowKey),
-            "No valid input. One of the value must be present from {keys, lastRowKey}");
-    LOGGER.info(" keys=" + keys.toString() + ";  lastRowKey="
-        + lastRowKey);
-
-    PcapsResponse pcapsResponse = new PcapsResponse();
-    // 1. Process partial response key
-    if (StringUtils.isNotEmpty(lastRowKey)) {
-      pcapsResponse = processKey(pcapsResponse, lastRowKey, startTime,
-          endTime, true, includeDuplicateLastRow, maxResultSize);
-      // LOGGER.debug("after scanning lastRowKey=" +
-      // pcapsResponse.toString()+"*********************************************************************");
-      if (pcapsResponse.getStatus() == PcapsResponse.Status.PARTIAL) {
-        return pcapsResponse;
-      }
-    }
-    // 2. Process input keys
-    List<String> sortedKeys = sortKeysByAscOrder(keys, includeReverseTraffic);
-    List<String> unprocessedKeys = new ArrayList<String>();
-    unprocessedKeys.addAll(sortedKeys);
-    if (StringUtils.isNotEmpty(lastRowKey)) {
-      unprocessedKeys.clear();
-      unprocessedKeys = getUnprocessedSublistOfKeys(sortedKeys,
-          lastRowKey);
-    }
-    LOGGER.info("unprocessedKeys in getPcaps" + unprocessedKeys.toString());
-    if (!CollectionUtils.isEmpty(unprocessedKeys)) {
-      for (int i = 0; i < unprocessedKeys.size(); i++) {
-        pcapsResponse = processKey(pcapsResponse, unprocessedKeys.get(i),
-            startTime, endTime, false, includeDuplicateLastRow, maxResultSize);
-        // LOGGER.debug("after scanning input unprocessedKeys.get(" + i + ") ="
-        // +
-        // pcapsResponse.toString()+"*********************************************************************");
-        if (pcapsResponse.getStatus() == PcapsResponse.Status.PARTIAL) {
-          return pcapsResponse;
-        }
-      }
-    }
-    return pcapsResponse;
-  }
-
-  /*
-   * (non-Javadoc)
-   * 
-   * @see com.cisco.opensoc.hbase.client.IPcapGetter#getPcaps(java.lang.String, long,
-   * long, boolean)
-   */
- 
-  public PcapsResponse getPcaps(String key, long startTime, long endTime,
-      boolean includeReverseTraffic) throws IOException {
-    Assert.hasText(key, "key must not be null or empty");
-    return getPcaps(Arrays.asList(key), null, startTime, endTime,
-        includeReverseTraffic, false, ConfigurationUtil.getDefaultResultSize());
-  }
-
-  /*
-   * (non-Javadoc)
-   * 
-   * @see com.cisco.opensoc.hbase.client.IPcapGetter#getPcaps(java.util.List)
-   */
- 
-  public PcapsResponse getPcaps(List<String> keys) throws IOException {
-    Assert.notEmpty(keys, "'keys' must not be null or empty");
-    return getPcaps(keys, null, -1, -1,
-        ConfigurationUtil.isDefaultIncludeReverseTraffic(), false,
-        ConfigurationUtil.getDefaultResultSize());
-  }
-
-  /*
-   * (non-Javadoc)
-   * 
-   * @see com.cisco.opensoc.hbase.client.IPcapGetter#getPcaps(java.lang.String)
-   */
- 
-  public PcapsResponse getPcaps(String key) throws IOException {
-    Assert.hasText(key, "key must not be null or empty");
-    return getPcaps(Arrays.asList(key), null, -1, -1,
-        ConfigurationUtil.isDefaultIncludeReverseTraffic(), false,
-        ConfigurationUtil.getDefaultResultSize());
-  }
-
-  /**
-   * Always returns the singleton instance.
-   * 
-   * @return IPcapGetter singleton instance
-   * @throws IOException
-   *           Signals that an I/O exception has occurred.
-   */
-  public static IPcapGetter getInstance() throws IOException {
-    if (pcapGetterHBase == null) {
-      synchronized (PcapGetterHBaseImpl.class) {
-        if (pcapGetterHBase == null) {
-          pcapGetterHBase = new PcapGetterHBaseImpl();
-        }
-      }
-    }
-    return pcapGetterHBase;
-  }
-
-  /**
-   * Instantiates a new pcap getter h base impl.
-   */
-  private PcapGetterHBaseImpl() {
-  }
-
-  /**
-   * Adds reverse keys to the list if the flag 'includeReverseTraffic' is set to
-   * true; removes duplicates and sorts the list by ascending order;.
-   * 
-   * @param keys
-   *          input keys
-   * @param includeReverseTraffic
-   *          flag whether or not to include reverse traffic
-   * @return List<String>
-   */
-  @VisibleForTesting
-  List<String> sortKeysByAscOrder(List<String> keys,
-      boolean includeReverseTraffic) {
-    Assert.notEmpty(keys, "'keys' must not be null");
-    if (includeReverseTraffic) {
-      keys.addAll(PcapHelper.reverseKey(keys));
-    }
-    List<String> deDupKeys = removeDuplicateKeys(keys);
-    Collections.sort(deDupKeys);
-    return deDupKeys;
-  }
-
-  /**
-   * Removes the duplicate keys.
-   * 
-   * @param keys
-   *          the keys
-   * @return the list
-   */
-  @VisibleForTesting
-public
-  List<String> removeDuplicateKeys(List<String> keys) {
-    Set<String> set = new HashSet<String>(keys);
-    return new ArrayList<String>(set);
-  }
-
-  /**
-   * <p>
-   * Returns the sublist starting from the element after the lastRowKey
-   * to the last element in the list; if the 'lastRowKey' is not matched
-   * the complete list will be returned.
-   * </p>
-   * 
-   * <pre>
-   * Eg :
-   *  keys = [18800006-1800000b-06-0019-caac, 18800006-1800000b-06-0050-5af6, 18800006-1800000b-11-0035-3810]
-   *  lastRowKey = "18800006-1800000b-06-0019-caac-65140-40815"
-   *  and the response from this method [18800006-1800000b-06-0050-5af6, 18800006-1800000b-11-0035-3810]
-   * </pre>
-   * 
-   * @param keys
-   *          keys
-   * @param lastRowKey
-   *          last row key of the previous partial response
-   * @return List<String>
-   */
-  @VisibleForTesting
-  List<String> getUnprocessedSublistOfKeys(List<String> keys,
-      String lastRowKey) {
-    Assert.notEmpty(keys, "'keys' must not be null");
-    Assert.hasText(lastRowKey, "'lastRowKey' must not be null");
-    String partialKey = getTokens(lastRowKey, 5);
-    int startIndex = 0;
-    for (int i = 0; i < keys.size(); i++) {
-      if (partialKey.equals(keys.get(i))) {
-        startIndex = i + 1;
-        break;
-      }
-    }
-    List<String> unprocessedKeys = keys.subList(startIndex, keys.size());
-    return unprocessedKeys;
-  }
-
-  /**
-   * Returns the first 'noOfTokens' tokens from the given key; token delimiter
-   * "-";.
-   * 
-   * @param key
-   *          given key
-   * @param noOfTokens
-   *          number of tokens to retrieve
-   * @return the tokens
-   */
-  @VisibleForTesting
-  String getTokens(String key, int noOfTokens) {
-    String delimeter = HBaseConfigConstants.PCAP_KEY_DELIMETER;
-    String regex = "\\" + delimeter;
-    String[] keyTokens = key.split(regex);
-    Assert.isTrue(noOfTokens < keyTokens.length,
-        "Invalid value for 'noOfTokens'");
-    StringBuffer sbf = new StringBuffer();
-    for (int i = 0; i < noOfTokens; i++) {
-      sbf.append(keyTokens[i]);
-      if (i != (noOfTokens - 1)) {
-        sbf.append(HBaseConfigConstants.PCAP_KEY_DELIMETER);
-      }
-
-    }
-    return sbf.toString();
-  }
-
-  /**
-   * Process key.
-   * 
-   * @param pcapsResponse
-   *          the pcaps response
-   * @param key
-   *          the key
-   * @param startTime
-   *          the start time
-   * @param endTime
-   *          the end time
-   * @param isPartialResponse
-   *          the is partial response
-   * @param includeDuplicateLastRow
-   *          the include duplicate last row
-   * @param maxResultSize
-   *          the max result size
-   * @return the pcaps response
-   * @throws IOException
-   *           Signals that an I/O exception has occurred.
-   */
-  @VisibleForTesting
-  PcapsResponse processKey(PcapsResponse pcapsResponse, String key,
-      long startTime, long endTime, boolean isPartialResponse,
-      boolean includeDuplicateLastRow, long maxResultSize) throws IOException {
-    HTable table = null;
-    Scan scan = null;
-    List<Cell> scannedCells = null;
-    try {
-      // 1. Create start and stop row for the key;
-      Map<String, String> keysMap = createStartAndStopRowKeys(key,
-          isPartialResponse, includeDuplicateLastRow);
-
-      // 2. if the input key contains all fragments (7) and it is not part
-      // of previous partial response (isPartialResponse),
-      // 'keysMap' will be null; do a Get; currently not doing any
-      // response size related checks for Get;
-      // by default all cells from a specific row are sorted by timestamp
-      if (keysMap == null) {
-        Get get = createGetRequest(key, startTime, endTime);
-        List<Cell> cells = executeGetRequest(table, get);
-        for (Cell cell : cells) {
-          pcapsResponse.addPcaps(CellUtil.cloneValue(cell));
-        }
-        return pcapsResponse;
-      }
-      // 3. Create and execute Scan request
-      scan = createScanRequest(pcapsResponse, keysMap, startTime, endTime,
-          maxResultSize);
-      scannedCells = executeScanRequest(table, scan);
-      LOGGER.info("scannedCells size :" + scannedCells.size());
-      addToResponse(pcapsResponse, scannedCells, maxResultSize);
-
-    } catch (IOException e) {
-      LOGGER.error("Exception occurred while fetching Pcaps for the keys :"
-          + key, e);
-      if (e instanceof ZooKeeperConnectionException
-          || e instanceof MasterNotRunningException
-          || e instanceof NoServerForRegionException) {
-        int maxRetryLimit = ConfigurationUtil.getConnectionRetryLimit();
-        System.out.println("maxRetryLimit =" + maxRetryLimit);
-        for (int attempt = 1; attempt <= maxRetryLimit; attempt++) {
-          System.out.println("attempting  =" + attempt);
-          try {
-            HBaseConfigurationUtil.closeConnection(); // closing the
-            // existing
-            // connection
-            // and retry,
-            // it will
-            // create a new
-            // HConnection
-            scannedCells = executeScanRequest(table, scan);
-            addToResponse(pcapsResponse, scannedCells, maxResultSize);
-            break;
-          } catch (IOException ie) {
-            if (attempt == maxRetryLimit) {
-              LOGGER.error("Throwing the exception after retrying "
-                  + maxRetryLimit + " times.");
-              throw e;
-            }
-          }
-        }
-      }
-
-    } finally {
-      if (table != null) {
-        table.close();
-      }
-    }
-    return pcapsResponse;
-  }
-
-  /**
-   * Adds the to response.
-   * 
-   * @param pcapsResponse
-   *          the pcaps response
-   * @param scannedCells
-   *          the scanned cells
-   * @param maxResultSize
-   *          the max result size
-   */
-  private void addToResponse(PcapsResponse pcapsResponse,
-      List<Cell> scannedCells, long maxResultSize) {
-    String lastKeyFromCurrentScan = null;
-    if (scannedCells != null && scannedCells.size() > 0) {
-      lastKeyFromCurrentScan = new String(CellUtil.cloneRow(scannedCells
-          .get(scannedCells.size() - 1)));
-    }
-    // 4. calculate the response size
-    Collections.sort(scannedCells, PcapHelper.getCellTimestampComparator());
-    for (Cell sortedCell : scannedCells) {
-      pcapsResponse.addPcaps(CellUtil.cloneValue(sortedCell));
-    }
-    if (!pcapsResponse.isResonseSizeWithinLimit(maxResultSize)) {
-      pcapsResponse.setStatus(PcapsResponse.Status.PARTIAL); // response size
-                                                             // reached
-      pcapsResponse.setLastRowKey(new String(lastKeyFromCurrentScan));
-    }
-  }
-
-  /**
-   * Builds start and stop row keys according to the following logic : 1.
-   * Creates tokens out of 'key' using pcap_id delimiter ('-') 2. if the input
-   * 'key' contains (assume : configuredTokensInRowKey=7 and
-   * minimumTokensIninputKey=5): a). 5 tokens
-   * ("srcIp-dstIp-protocol-srcPort-dstPort") startKey =
-   * "srcIp-dstIp-protocol-srcPort-dstPort-00000-00000" stopKey =
-   * "srcIp-dstIp-protocol-srcPort-dstPort-99999-99999" b). 6 tokens
-   * ("srcIp-dstIp-protocol-srcPort-dstPort-id1") startKey =
-   * "srcIp-dstIp-protocol-srcPort-dstPort-id1-00000" stopKey =
-   * "srcIp-dstIp-protocol-srcPort-dstPort-id1-99999"
-   * 
-   * c). 7 tokens ("srcIp-dstIp-protocol-srcPort-dstPort-id1-id2") 1>. if the
-   * key is NOT part of the partial response from previous request, return
-   * 'null' 2>. if the key is part of partial response from previous request
-   * startKey = "srcIp-dstIp-protocol-srcPort-dstPort-id1-(id2+1)"; 1 is added
-   * to exclude this key as it was included in the previous request stopKey =
-   * "srcIp-dstIp-protocol-srcPort-dstPort-99999-99999"
-   * 
-   * @param key
-   *          the key
-   * @param isLastRowKey
-   *          if the key is part of partial response
-   * @param includeDuplicateLastRow
-   *          the include duplicate last row
-   * @return Map<String, String>
-   */
-  @VisibleForTesting
-  Map<String, String> createStartAndStopRowKeys(String key,
-      boolean isLastRowKey, boolean includeDuplicateLastRow) {
-    String delimeter = HBaseConfigConstants.PCAP_KEY_DELIMETER;
-    String regex = "\\" + delimeter;
-    String[] keyTokens = key.split(regex);
-
-    String startKey = null;
-    String endKey = null;
-    Map<String, String> map = new HashMap<String, String>();
-
-    int configuredTokensInRowKey = ConfigurationUtil
-        .getConfiguredTokensInRowkey();
-    int minimumTokensIninputKey = ConfigurationUtil
-        .getMinimumTokensInInputkey();
-    Assert
-        .isTrue(
-            minimumTokensIninputKey <= configuredTokensInRowKey,
-            "tokens in the input key (separated by '-'), must be less than or equal to the tokens used in hbase table row key ");
-    // in case if the input key contains 'configuredTokensInRowKey' tokens and
-    // it is NOT a
-    // partial response key, do a Get instead of Scan
-    if (keyTokens.length == configuredTokensInRowKey) {
-      if (!isLastRowKey) {
-        return null;
-      }
-      // it is a partial response key; 'startKey' is same as input partial
-      // response key; 'endKey' can be built by replacing
-      // (configuredTokensInRowKey - minimumTokensIninputKey) tokens
-      // of input partial response key with '99999'
-      if (keyTokens.length == minimumTokensIninputKey) {
-        return null;
-      }
-      int appendingTokenSlots = configuredTokensInRowKey
-          - minimumTokensIninputKey;
-      if (appendingTokenSlots > 0) {
-        String partialKey = getTokens(key, minimumTokensIninputKey);
-        StringBuffer sbfStartNew = new StringBuffer(partialKey);
-        StringBuffer sbfEndNew = new StringBuffer(partialKey);
-        for (int i = 0; i < appendingTokenSlots; i++) {
-          if (i == (appendingTokenSlots - 1)) {
-            if (!includeDuplicateLastRow) {
-              sbfStartNew
-                  .append(HBaseConfigConstants.PCAP_KEY_DELIMETER)
-                  .append(
-                      Integer.valueOf(keyTokens[minimumTokensIninputKey + i]) + 1);
-            } else {
-              sbfStartNew.append(HBaseConfigConstants.PCAP_KEY_DELIMETER)
-                  .append(keyTokens[minimumTokensIninputKey + i]);
-            }
-          } else {
-            sbfStartNew.append(HBaseConfigConstants.PCAP_KEY_DELIMETER).append(
-                keyTokens[minimumTokensIninputKey + i]);
-          }
-          sbfEndNew.append(HBaseConfigConstants.PCAP_KEY_DELIMETER).append(
-              getMaxLimitForAppendingTokens());
-        }
-        startKey = sbfStartNew.toString();
-        endKey = sbfEndNew.toString();
-      }
-    } else {
-      StringBuffer sbfStart = new StringBuffer(key);
-      StringBuffer sbfEnd = new StringBuffer(key);
-      for (int i = keyTokens.length; i < configuredTokensInRowKey; i++) {
-        sbfStart.append(HBaseConfigConstants.PCAP_KEY_DELIMETER).append(
-            getMinLimitForAppendingTokens());
-        sbfEnd.append(HBaseConfigConstants.PCAP_KEY_DELIMETER).append(
-            getMaxLimitForAppendingTokens());
-      }
-      startKey = sbfStart.toString();
-      endKey = sbfEnd.toString();
-    }
-    map.put(HBaseConfigConstants.START_KEY, startKey);
-    map.put(HBaseConfigConstants.END_KEY, endKey);
-
-    return map;
-  }
-
-  /**
-   * Returns false if keys is empty or null AND lastRowKey is null or
-   * empty; otherwise returns true;.
-   * 
-   * @param keys
-   *          input row keys
-   * @param lastRowKey
-   *          partial response key
-   * @return boolean
-   */
-  @VisibleForTesting
-  boolean checkIfValidInput(List<String> keys, String lastRowKey) {
-    if (CollectionUtils.isEmpty(keys)
-        && StringUtils.isEmpty(lastRowKey)) {
-      return false;
-    }
-    return true;
-  }
-
-  /**
-   * Executes the given Get request.
-   * 
-   * @param table
-   *          hbase table
-   * @param get
-   *          Get
-   * @return List<Cell>
-   * @throws IOException
-   *           Signals that an I/O exception has occurred.
-   */
-  private List<Cell> executeGetRequest(HTable table, Get get)
-      throws IOException {
-    LOGGER.info("Get :" + get.toString());
-    table = (HTable) HBaseConfigurationUtil.getConnection().getTable(
-        ConfigurationUtil.getTableName());
-    Result result = table.get(get);
-    List<Cell> cells = result.getColumnCells(
-        ConfigurationUtil.getColumnFamily(),
-        ConfigurationUtil.getColumnQualifier());
-    return cells;
-  }
-
-  /**
-   * Execute scan request.
-   * 
-   * @param table
-   *          hbase table
-   * @param scan
-   *          the scan
-   * @return the list
-   * @throws IOException
-   *           Signals that an I/O exception has occurred.
-   */
-  private List<Cell> executeScanRequest(HTable table, Scan scan)
-      throws IOException {
-    LOGGER.info("Scan :" + scan.toString());
-    table = (HTable) HBaseConfigurationUtil.getConnection().getTable(
-    		ConfigurationUtil.getConfiguration().getString("hbase.table.name"));
-    ResultScanner resultScanner = table.getScanner(scan);
-    List<Cell> scannedCells = new ArrayList<Cell>();
-    for (Result result = resultScanner.next(); result != null; result = resultScanner
-        .next()) {
-      List<Cell> cells = result.getColumnCells(
-          ConfigurationUtil.getColumnFamily(),
-          ConfigurationUtil.getColumnQualifier());
-      if (cells != null) {
-        for (Cell cell : cells) {
-          scannedCells.add(cell);
-        }
-      }
-    }
-    return scannedCells;
-  }
-
-  /**
-   * Creates the get request.
-   * 
-   * @param key
-   *          the key
-   * @param startTime
-   *          the start time
-   * @param endTime
-   *          the end time
-   * @return the gets the
-   * @throws IOException
-   *           Signals that an I/O exception has occurred.
-   */
-  @VisibleForTesting
-  Get createGetRequest(String key, long startTime, long endTime)
-      throws IOException {
-    Get get = new Get(Bytes.toBytes(key));
-    // set family name
-    get.addFamily(ConfigurationUtil.getColumnFamily());
-
-    // set column family, qualifier
-    get.addColumn(ConfigurationUtil.getColumnFamily(),
-        ConfigurationUtil.getColumnQualifier());
-
-    // set max versions
-    get.setMaxVersions(ConfigurationUtil.getMaxVersions());
-
-    // set time range
-    setTimeRangeOnGet(get, startTime, endTime);
-    return get;
-  }
-
-  /**
-   * Creates the scan request.
-   * 
-   * @param pcapsResponse
-   *          the pcaps response
-   * @param keysMap
-   *          the keys map
-   * @param startTime
-   *          the start time
-   * @param endTime
-   *          the end time
-   * @param maxResultSize
-   *          the max result size
-   * @return the scan
-   * @throws IOException
-   *           Signals that an I/O exception has occurred.
-   */
-  @VisibleForTesting
-  Scan createScanRequest(PcapsResponse pcapsResponse,
-      Map<String, String> keysMap, long startTime, long endTime,
-      long maxResultSize) throws IOException {
-    Scan scan = new Scan();
-    // set column family, qualifier
-    scan.addColumn(ConfigurationUtil.getColumnFamily(),
-        ConfigurationUtil.getColumnQualifier());
-
-    // set start and stop keys
-    scan.setStartRow(keysMap.get(HBaseConfigConstants.START_KEY).getBytes());
-    scan.setStopRow(keysMap.get(HBaseConfigConstants.END_KEY).getBytes());
-
-    // set max results size : remaining size = max results size - ( current
-    // pcaps response size + possible maximum row size)
-    long remainingSize = maxResultSize
-        - (pcapsResponse.getResponseSize() + ConfigurationUtil.getMaxRowSize());
-
-    if (remainingSize > 0) {
-      scan.setMaxResultSize(remainingSize);
-    }
-    // set max versions
-    scan.setMaxVersions(ConfigurationUtil.getConfiguration().getInt(
-        "hbase.table.column.maxVersions"));
-
-    // set time range
-    setTimeRangeOnScan(scan, startTime, endTime);
-    return scan;
-  }
-
-  /**
-   * Sets the time range on scan.
-   * 
-   * @param scan
-   *          the scan
-   * @param startTime
-   *          the start time
-   * @param endTime
-   *          the end time
-   * @throws IOException
-   *           Signals that an I/O exception has occurred.
-   */
-  private void setTimeRangeOnScan(Scan scan, long startTime, long endTime)
-      throws IOException {
-    boolean setTimeRange = true;
-    if (startTime < 0 && endTime < 0) {
-      setTimeRange = false;
-    }
-    if (setTimeRange) {
-      if (startTime < 0) {
-        startTime = 0;
-      } else {
-        startTime = PcapHelper.convertToDataCreationTimeUnit(startTime);
-      }
-      if (endTime < 0) {
-        endTime = Long.MAX_VALUE;
-      } else {
-        endTime = PcapHelper.convertToDataCreationTimeUnit(endTime);
-      }
-      Assert.isTrue(startTime < endTime,
-          "startTime value must be less than endTime value");
-      scan.setTimeRange(startTime, endTime);
-    }
-  }
-
-  /**
-   * Sets the time range on get.
-   * 
-   * @param get
-   *          the get
-   * @param startTime
-   *          the start time
-   * @param endTime
-   *          the end time
-   * @throws IOException
-   *           Signals that an I/O exception has occurred.
-   */
-  private void setTimeRangeOnGet(Get get, long startTime, long endTime)
-      throws IOException {
-    boolean setTimeRange = true;
-    if (startTime < 0 && endTime < 0) {
-      setTimeRange = false;
-    }
-    if (setTimeRange) {
-      if (startTime < 0) {
-        startTime = 0;
-      } else {
-        startTime = PcapHelper.convertToDataCreationTimeUnit(startTime);
-      }
-      if (endTime < 0) {
-        endTime = Long.MAX_VALUE;
-      } else {
-        endTime = PcapHelper.convertToDataCreationTimeUnit(endTime);
-      }
-      Assert.isTrue(startTime < endTime,
-          "startTime value must be less than endTime value");
-      get.setTimeRange(startTime, endTime);
-    }
-  }
-
-  /**
-   * Gets the min limit for appending tokens.
-   * 
-   * @return the min limit for appending tokens
-   */
-  private String getMinLimitForAppendingTokens() {
-    int digits = ConfigurationUtil.getAppendingTokenDigits();
-    StringBuffer sbf = new StringBuffer();
-    for (int i = 0; i < digits; i++) {
-      sbf.append("0");
-    }
-    return sbf.toString();
-  }
-
-  /**
-   * Gets the max limit for appending tokens.
-   * 
-   * @return the max limit for appending tokens
-   */
-  private String getMaxLimitForAppendingTokens() {
-    int digits = ConfigurationUtil.getAppendingTokenDigits();
-    StringBuffer sbf = new StringBuffer();
-    for (int i = 0; i < digits; i++) {
-      sbf.append("9");
-    }
-    return sbf.toString();
-  }
-
-  /**
-   * The main method.
-   * 
-   * @param args
-   *          the arguments
-   * 
-   * @throws IOException
-   *           Signals that an I/O exception has occurred.
-   */
-  public static void main(String[] args) throws IOException {
-    if (args == null || args.length < 2) {
-      usage();
-      return;
-    }
-    String outputFileName = null;
-    outputFileName = args[1];
-    List<String> keys = Arrays.asList(StringUtils.split(args[2], ","));
-    System.out.println("Geting keys " + keys);
-    long startTime = 0;
-    long endTime = Long.MAX_VALUE;
-    if (args.length > 3) {
-      startTime = Long.valueOf(args[3]);
-    }
-    if (args.length > 4) {
-      endTime = Long.valueOf(args[4]);
-    }
-    System.out.println("With start time " + startTime + " and end time "
-        + endTime);
-    PcapGetterHBaseImpl downloader = new PcapGetterHBaseImpl();
-    PcapsResponse pcaps = downloader.getPcaps(keys, null, startTime, endTime,
-        false, false, 6);
-    File file = new File(outputFileName);
-    FileUtils.write(file, "", false);
-    FileUtils.writeByteArrayToFile(file, pcaps.getPcaps(), true);
-  }
-
-  /**
-   * Usage.
-   */
-  private static void usage() {
-    System.out.println("java " + PcapGetterHBaseImpl.class.getName() // $codepro.audit.disable
-        // debuggingCode
-        + " <zk quorum> <output file> <start key> [stop key]");
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/28c250d1/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapHelper.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapHelper.java b/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapHelper.java
deleted file mode 100644
index 893d176..0000000
--- a/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapHelper.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.pcapservice;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.log4j.Logger;
-import org.mortbay.log.Log;
-import org.springframework.util.Assert;
-
-import com.google.common.annotations.VisibleForTesting;
-
-/**
- * utility class which holds methods related to time conversions, building
- * reverse keys.
- */
-public class PcapHelper {
-
-  /** The Constant LOGGER. */
-  private static final Logger LOGGER = Logger.getLogger(PcapHelper.class);
-
-  /** The cell timestamp comparator. */
-  private static CellTimestampComparator CELL_TIMESTAMP_COMPARATOR = new CellTimestampComparator();
-
-  /**
-   * The Enum TimeUnit.
-   */
-  public enum TimeUnit {
-
-    /** The seconds. */
-    SECONDS,
-    /** The millis. */
-    MILLIS,
-    /** The micros. */
-    MICROS,
-    /** The unknown. */
-    UNKNOWN
-  };
-
-  /**
-   * Converts the given time to the 'hbase' data creation time unit.
-   * 
-   * @param inputTime
-   *          the input time
-   * @return the long
-   */
-  public static long convertToDataCreationTimeUnit(long inputTime) {
-    if (inputTime <= 9999999999L) {
-      return convertSecondsToDataCreationTimeUnit(inputTime); // input time unit
-                                                              // is in seconds
-    } else if (inputTime <= 9999999999999L) {
-      return convertMillisToDataCreationTimeUnit(inputTime); // input time unit
-                                                             // is in millis
-    } else if (inputTime <= 9999999999999999L) {
-      return convertMicrosToDataCreationTimeUnit(inputTime); // input time unit
-                                                             // it in micros
-    }
-    return inputTime; // input time unit is unknown
-  }
-
-  /**
-   * Returns the 'hbase' data creation time unit by reading
-   * 'hbase.table.data.time.unit' property in 'hbase-config' properties file; If
-   * none is mentioned in properties file, returns <code>TimeUnit.UNKNOWN</code>
-   * 
-   * @return TimeUnit
-   */
-  @VisibleForTesting
-  public static TimeUnit getDataCreationTimeUnit() {
-    String timeUnit = ConfigurationUtil.getConfiguration().getString(
-        "hbase.table.data.time.unit");
-    LOGGER.debug("hbase.table.data.time.unit=" + timeUnit.toString());
-    if (StringUtils.isNotEmpty(timeUnit)) {
-      return TimeUnit.valueOf(timeUnit);
-    }
-    return TimeUnit.UNKNOWN;
-  }
-
-  /**
-   * Convert seconds to data creation time unit.
-   * 
-   * @param inputTime
-   *          the input time
-   * @return the long
-   */
-  @VisibleForTesting
-  public static long convertSecondsToDataCreationTimeUnit(long inputTime) {
-    System.out.println("convert Seconds To DataCreation TimeUnit");
-    TimeUnit dataCreationTimeUnit = getDataCreationTimeUnit();
-    if (TimeUnit.SECONDS == dataCreationTimeUnit) {
-      return inputTime;
-    } else if (TimeUnit.MILLIS == dataCreationTimeUnit) {
-      return inputTime * 1000;
-    } else if (TimeUnit.MICROS == dataCreationTimeUnit) {
-      return inputTime * 1000 * 1000;
-    }
-    return inputTime;
-  }
-
-  /**
-   * Builds the reverseKey to fetch the pcaps in the reverse traffic
-   * (destination to source).
-   * 
-   * @param key
-   *          indicates hbase rowKey (partial or full) in the format
-   *          "srcAddr-dstAddr-protocol-srcPort-dstPort-fragment"
-   * @return String indicates the key in the format
-   *         "dstAddr-srcAddr-protocol-dstPort-srcPort"
-   */
-  public static String reverseKey(String key) {
-    Assert.hasText(key, "key must not be null or empty");
-    String delimeter = HBaseConfigConstants.PCAP_KEY_DELIMETER;
-    String regex = "\\" + delimeter;
-    StringBuffer sb = new StringBuffer();
-    try {
-      String[] tokens = key.split(regex);
-      Assert
-          .isTrue(
-              (tokens.length == 5 || tokens.length == 6 || tokens.length == 7),
-              "key is not in the format : 'srcAddr-dstAddr-protocol-srcPort-dstPort-{ipId-fragment identifier}'");
-      sb.append(tokens[1]).append(delimeter).append(tokens[0])
-          .append(delimeter).append(tokens[2]).append(delimeter)
-          .append(tokens[4]).append(delimeter).append(tokens[3]);
-    } catch (Exception e) {
-      Log.warn("Failed to reverse the key. Reverse scan won't be performed.", e);
-    }
-    return sb.toString();
-  }
-
-  /**
-   * Builds the reverseKeys to fetch the pcaps in the reverse traffic
-   * (destination to source). If all keys in the input are not in the expected
-   * format, it returns an empty list;
-   * 
-   * @param keys
-   *          indicates list of hbase rowKeys (partial or full) in the format
-   *          "srcAddr-dstAddr-protocol-srcPort-dstPort-fragment"
-   * @return List<String> indicates the list of keys in the format
-   *         "dstAddr-srcAddr-protocol-dstPort-srcPort"
-   */
-  public static List<String> reverseKey(List<String> keys) {
-    Assert.notEmpty(keys, "'keys' must not be null or empty");
-    List<String> reverseKeys = new ArrayList<String>();
-    for (String key : keys) {
-      if (key != null) {
-        String reverseKey = reverseKey(key);
-        if (StringUtils.isNotEmpty(reverseKey)) {
-          reverseKeys.add(reverseKey);
-        }
-      }
-    }
-    return reverseKeys;
-  }
-
-  /**
-   * Returns Comparator for sorting pcaps cells based on the timestamp (dsc).
-   * 
-   * @return CellTimestampComparator
-   */
-  public static CellTimestampComparator getCellTimestampComparator() {
-    return CELL_TIMESTAMP_COMPARATOR;
-  }
-
-  /**
-   * Convert millis to data creation time unit.
-   * 
-   * @param inputTime
-   *          the input time
-   * @return the long
-   */
-  @VisibleForTesting
-  private static long convertMillisToDataCreationTimeUnit(long inputTime) {
-    System.out.println("convert Millis To DataCreation TimeUnit");
-    TimeUnit dataCreationTimeUnit = getDataCreationTimeUnit();
-    if (TimeUnit.SECONDS == dataCreationTimeUnit) {
-      return (inputTime / 1000);
-    } else if (TimeUnit.MILLIS == dataCreationTimeUnit) {
-      return inputTime;
-    } else if (TimeUnit.MICROS == dataCreationTimeUnit) {
-      return inputTime * 1000;
-    }
-    return inputTime;
-  }
-
-  /**
-   * Convert micros to data creation time unit.
-   * 
-   * @param inputTime
-   *          the input time
-   * @return the long
-   */
-  @VisibleForTesting
-  private static long convertMicrosToDataCreationTimeUnit(long inputTime) {
-    System.out.println("convert Micros To DataCreation TimeUnit");
-    TimeUnit dataCreationTimeUnit = getDataCreationTimeUnit();
-    if (TimeUnit.SECONDS == dataCreationTimeUnit) {
-      return inputTime / (1000 * 1000);
-    } else if (TimeUnit.MILLIS == dataCreationTimeUnit) {
-      return inputTime / 1000;
-    } else if (TimeUnit.MICROS == dataCreationTimeUnit) {
-      return inputTime;
-    }
-    return inputTime;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/28c250d1/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java b/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java
index ce3cec9..1d0beb8 100644
--- a/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java
+++ b/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java
@@ -20,158 +20,83 @@ package org.apache.metron.pcapservice;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.EnumMap;
 import java.util.List;
 
 import javax.servlet.http.HttpServletResponse;
-import javax.ws.rs.DefaultValue;
-import javax.ws.rs.GET;
-import javax.ws.rs.Path;
-import javax.ws.rs.QueryParam;
+import javax.ws.rs.*;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 
+import com.google.common.base.Joiner;
 import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.log4j.Logger;
 
 import com.google.common.annotations.VisibleForTesting;
-import org.apache.metron.pcap.utils.PcapUtils;
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.utils.timestamp.TimestampConverters;
+import org.apache.metron.pcap.mr.PcapJob;
 
 @Path("/")
 public class PcapReceiverImplRestEasy {
 
-	/** The Constant LOGGER. */
-	private static final Logger LOGGER = Logger
-			.getLogger(PcapReceiverImplRestEasy.class);
-
-	/** The Constant HEADER_CONTENT_DISPOSITION_NAME. */
-	private static final String HEADER_CONTENT_DISPOSITION_NAME = "Content-Disposition";
-
-	/** The Constant HEADER_CONTENT_DISPOSITION_VALUE. */
-	private static final String HEADER_CONTENT_DISPOSITION_VALUE = "attachment; filename=\"managed-threat.pcap\"";
-
-	/** partial response key header name. */
-	private static final String HEADER_PARTIAL_RESPONE_KEY = "lastRowKey";
-
-	@GET
-	@Path("pcapGetter/getPcapsByKeys")
-	public Response getPcapsByKeys(
-			@QueryParam("keys") List<String> keys,
-			@QueryParam("lastRowKey") String lastRowKey,
-			@DefaultValue("-1") @QueryParam("startTime") long startTime,
-			@DefaultValue("-1") @QueryParam("endTime") long endTime,
-			@QueryParam("includeDuplicateLastRow") boolean includeDuplicateLastRow,
-			@QueryParam("includeReverseTraffic") boolean includeReverseTraffic,
-			@QueryParam("maxResponseSize") String maxResponseSize,
-			@Context HttpServletResponse response) throws IOException {
-		PcapsResponse pcapResponse = null;
-
-		if (keys == null || keys.size() == 0)
-			return Response.serverError().status(Response.Status.NO_CONTENT)
-					.entity("'keys' must not be null or empty").build();
-
-		try {
-			IPcapGetter pcapGetter = PcapGetterHBaseImpl.getInstance();
-			pcapResponse = pcapGetter.getPcaps(parseKeys(keys), lastRowKey,
-					startTime, endTime, includeReverseTraffic,
-					includeDuplicateLastRow,
-					ConfigurationUtil.validateMaxResultSize(maxResponseSize));
-			LOGGER.info("pcaps response in REST layer ="
-					+ pcapResponse.toString());
-
-			// return http status '204 No Content' if the pcaps response size is
-			// 0
-			if (pcapResponse == null || pcapResponse.getResponseSize() == 0) {
-
-				return Response.status(Response.Status.NO_CONTENT).build();
-			}
-
-			// return http status '206 Partial Content', the partial response
-			// file and
-			// 'lastRowKey' header , if the pcaps response status is 'PARTIAL'
-
-			response.setHeader(HEADER_CONTENT_DISPOSITION_NAME,
-					HEADER_CONTENT_DISPOSITION_VALUE);
-
-			if (pcapResponse.getStatus() == PcapsResponse.Status.PARTIAL) {
-
-				response.setHeader(HEADER_PARTIAL_RESPONE_KEY,
-						pcapResponse.getLastRowKey());
-
-				return Response
-						.ok(pcapResponse.getPcaps(),
-								MediaType.APPLICATION_OCTET_STREAM).status(206)
-						.build();
-
-			}
-
-		} catch (IOException e) {
-			LOGGER.error(
-					"Exception occurred while fetching Pcaps for the keys :"
-							+ keys.toString(), e);
-			throw e;
-		}
-
-		// return http status '200 OK' along with the complete pcaps response
-		// file,
-		// and headers
-		// return new ResponseEntity<byte[]>(pcapResponse.getPcaps(), headers,
-		// HttpStatus.OK);
-
-		return Response
-				.ok(pcapResponse.getPcaps(), MediaType.APPLICATION_OCTET_STREAM)
-				.status(200).build();
-
-	}
-	
-	
-	@GET
-	@Path("/pcapGetter/getPcapsByKeyRange")
-
-	  public Response getPcapsByKeyRange(
-	      @QueryParam("startKey") String startKey,
-	      @QueryParam("endKey")String endKey,
-	      @QueryParam("maxResponseSize") String maxResponseSize,
-	      @DefaultValue("-1") @QueryParam("startTime")long startTime,
-	      @DefaultValue("-1") @QueryParam("endTime") long endTime, 
-	      @Context HttpServletResponse servlet_response) throws IOException {
-
-		if (startKey == null || startKey.equals(""))
-			return Response.serverError().status(Response.Status.NO_CONTENT)
-					.entity("'start key' must not be null or empty").build();
-		
-		if (startKey == null || startKey.equals(""))
-			return Response.serverError().status(Response.Status.NO_CONTENT)
-					.entity("'end key' must not be null or empty").build();
-		
-		
-	    byte[] response = null;
-	    try {
-	      IPcapScanner pcapScanner = PcapScannerHBaseImpl.getInstance();
-	      response = pcapScanner.getPcaps(startKey, endKey,
-	          ConfigurationUtil.validateMaxResultSize(maxResponseSize), startTime,
-	          endTime);
-	      if (response == null || response.length == 0) {
-	    	  
-	    	  return Response.status(Response.Status.NO_CONTENT).build();
-	        
-	      }
-	      servlet_response.setHeader(HEADER_CONTENT_DISPOSITION_NAME,
-					HEADER_CONTENT_DISPOSITION_VALUE);
-
-	    } catch (IOException e) {
-	      LOGGER.error(
-	          "Exception occurred while fetching Pcaps for the key range : startKey="
-	              + startKey + ", endKey=" + endKey, e);
-	      throw e;
-	    }
-	    // return http status '200 OK' along with the complete pcaps response file,
-	    // and headers
-	    
-		return Response
-				.ok(response, MediaType.APPLICATION_OCTET_STREAM)
-				.status(200).build();
-	  }
+  /** The Constant LOGGER. */
+  private static final Logger LOGGER = Logger
+          .getLogger(PcapReceiverImplRestEasy.class);
+
+  /** The Constant HEADER_CONTENT_DISPOSITION_NAME. */
+  private static final String HEADER_CONTENT_DISPOSITION_NAME = "Content-Disposition";
+
+  /** The Constant HEADER_CONTENT_DISPOSITION_VALUE. */
+  private static final String HEADER_CONTENT_DISPOSITION_VALUE = "attachment; filename=\"managed-threat.pcap\"";
+
+  /** partial response key header name. */
+  private static final String HEADER_PARTIAL_RESPONE_KEY = "lastRowKey";
+  private static ThreadLocal<Configuration> CONFIGURATION = new ThreadLocal<Configuration>() {
+    /**
+     * Returns the current thread's "initial value" for this
+     * thread-local variable.  This method will be invoked the first
+     * time a thread accesses the variable with the {@link #get}
+     * method, unless the thread previously invoked the {@link #set}
+     * method, in which case the {@code initialValue} method will not
+     * be invoked for the thread.  Normally, this method is invoked at
+     * most once per thread, but it may be invoked again in case of
+     * subsequent invocations of {@link #remove} followed by {@link #get}.
+     * <p>
+     * <p>This implementation simply returns {@code null}; if the
+     * programmer desires thread-local variables to have an initial
+     * value other than {@code null}, {@code ThreadLocal} must be
+     * subclassed, and this method overridden.  Typically, an
+     * anonymous inner class will be used.
+     *
+     * @return the initial value for this thread-local
+     */
+    @Override
+    protected Configuration initialValue() {
+      return new Configuration();
+    }
+  };
+  PcapJob queryUtil = new PcapJob();
+
+  protected PcapJob getQueryUtil() {
+    return queryUtil;
+  }
+
+  private static boolean isValidPort(String port) {
+    if( port != null && !port.equals("") ) {
+      try {
+        Integer.parseInt(port);
+        return true;
+      }
+      catch(Exception e) {
+        return false;
+      }
+    }
+    return false;
+  }
 
 	  /*
 	   * (non-Javadoc)
@@ -182,86 +107,104 @@ public class PcapReceiverImplRestEasy {
 	   * java.lang.String, long, long, boolean,
 	   * javax.servlet.http.HttpServletResponse)
 	   */
-	  
-	@GET
-	@Path("/pcapGetter/getPcapsByIdentifiers")
-
-	  public Response getPcapsByIdentifiers(
-	      @QueryParam ("srcIp") String srcIp, 
-	      @QueryParam ("dstIp") String dstIp,
-	      @QueryParam ("protocol") String protocol, 
-	      @QueryParam ("srcPort") String srcPort,
-	      @QueryParam ("dstPort") String dstPort,
-	      @DefaultValue("-1") @QueryParam ("startTime")long startTime,
-	      @DefaultValue("-1") @QueryParam ("endTime")long endTime,
-	      @DefaultValue("false") @QueryParam ("includeReverseTraffic") boolean includeReverseTraffic,
-	      @Context HttpServletResponse servlet_response)
-	      
-	      throws IOException {
-		
-		if (srcIp == null || srcIp.equals(""))
-			return Response.serverError().status(Response.Status.NO_CONTENT)
-					.entity("'srcIp' must not be null or empty").build();
-		
-		if (dstIp == null || dstIp.equals(""))
-			return Response.serverError().status(Response.Status.NO_CONTENT)
-					.entity("'dstIp' must not be null or empty").build();
-		
-		if (protocol == null || protocol.equals(""))
-			return Response.serverError().status(Response.Status.NO_CONTENT)
-					.entity("'protocol' must not be null or empty").build();
-		
-		if (srcPort == null || srcPort.equals(""))
-			return Response.serverError().status(Response.Status.NO_CONTENT)
-					.entity("'srcPort' must not be null or empty").build();
-		
-		if (dstPort == null || dstPort.equals(""))
-			return Response.serverError().status(Response.Status.NO_CONTENT)
-					.entity("'dstPort' must not be null or empty").build();
-		
-	
-	    PcapsResponse response = null;
-	    try {
-	      String sessionKey = PcapUtils.getPartialSessionKey(srcIp, dstIp, protocol,
-	          srcPort, dstPort);
-	      LOGGER.info("sessionKey =" + sessionKey);
-	      IPcapGetter pcapGetter = PcapGetterHBaseImpl.getInstance();
-	      response = pcapGetter.getPcaps(Arrays.asList(sessionKey), null,
-	          startTime, endTime, includeReverseTraffic, false,
-	          ConfigurationUtil.getDefaultResultSize());
-	      if (response == null || response.getResponseSize() == 0) {
-	         return Response.status(Response.Status.NO_CONTENT).build();
-	      }
-	      servlet_response.setHeader(HEADER_CONTENT_DISPOSITION_NAME,
-					HEADER_CONTENT_DISPOSITION_VALUE);
-
-	    } catch (IOException e) {
-	      LOGGER.error("Exception occurred while fetching Pcaps by identifiers :",
-	          e);
-	      throw e;
-	    }
-	    // return http status '200 OK' along with the complete pcaps response file,
-	    // and headers
-	    return Response
-				.ok(response.getPcaps(), MediaType.APPLICATION_OCTET_STREAM)
-				.status(200).build();
-	  }
-	/**
-	 * This method parses the each value in the List using delimiter ',' and
-	 * builds a new List;.
-	 * 
-	 * @param keys
-	 *            list of keys to be parsed
-	 * @return list of keys
-	 */
-	@VisibleForTesting
-	List<String> parseKeys(List<String> keys) {
-		// Assert.notEmpty(keys);
-		List<String> parsedKeys = new ArrayList<String>();
-		for (String key : keys) {
-			parsedKeys.addAll(Arrays.asList(StringUtils.split(
-					StringUtils.trim(key), ",")));
-		}
-		return parsedKeys;
-	}
+  @GET
+  @Path("/pcapGetter/getPcapsByIdentifiers")
+
+  public Response getPcapsByIdentifiers(
+          @QueryParam ("srcIp") String srcIp,
+          @QueryParam ("dstIp") String dstIp,
+          @QueryParam ("protocol") String protocol,
+          @QueryParam ("srcPort") String srcPort,
+          @QueryParam ("dstPort") String dstPort,
+          @DefaultValue("-1") @QueryParam ("startTime")long startTime,
+          @DefaultValue("-1") @QueryParam ("endTime")long endTime,
+          @DefaultValue("false") @QueryParam ("includeReverseTraffic") boolean includeReverseTraffic,
+          @Context HttpServletResponse servlet_response)
+
+          throws IOException {
+
+    if (!isValidPort(srcPort)) {
+      return Response.serverError().status(Response.Status.NO_CONTENT)
+              .entity("'srcPort' must not be null, empty or a non-integer").build();
+    }
+
+    if (!isValidPort(dstPort)) {
+      return Response.serverError().status(Response.Status.NO_CONTENT)
+              .entity("'dstPort' must not be null, empty or a non-integer").build();
+    }
+
+    final boolean includeReverseTrafficF = includeReverseTraffic;
+    PcapsResponse response = new PcapsResponse();
+    try {
+      if(startTime < 0) {
+        startTime = 0L;
+      }
+      if(endTime < 0) {
+        endTime = System.currentTimeMillis();
+      }
+
+      //convert to nanoseconds since the epoch
+      startTime = TimestampConverters.MILLISECONDS.toNanoseconds(startTime);
+      endTime = TimestampConverters.MILLISECONDS.toNanoseconds(endTime);
+      EnumMap<Constants.Fields, String> query = new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
+                                      if(srcIp != null) {
+                                        put(Constants.Fields.SRC_ADDR, srcIp);
+                                      }
+                                      if(dstIp != null) {
+                                        put(Constants.Fields.DST_ADDR, dstIp);
+                                      }
+                                      if(srcPort != null) {
+                                        put(Constants.Fields.SRC_PORT, srcPort);
+                                      }
+                                      if(dstPort != null) {
+                                        put(Constants.Fields.DST_PORT, dstPort);
+                                      }
+                                      if(protocol != null) {
+                                        put(Constants.Fields.PROTOCOL, protocol);
+                                      }
+                                      put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "" + includeReverseTrafficF);
+                                    }};
+      if(LOGGER.isDebugEnabled()) {
+        LOGGER.debug("Query received: " + Joiner.on(",").join(query.entrySet()));
+      }
+      response.setPcaps(getQueryUtil().query(new org.apache.hadoop.fs.Path(ConfigurationUtil.getPcapOutputPath())
+                                    , new org.apache.hadoop.fs.Path(ConfigurationUtil.getTempQueryOutputPath())
+                                    , startTime
+                                    , endTime
+                                    , query
+                                    , CONFIGURATION.get()
+                                    , FileSystem.get(CONFIGURATION.get())
+                                    )
+                     );
+
+    } catch (Exception e) {
+      LOGGER.error("Exception occurred while fetching Pcaps by identifiers :",
+              e);
+      throw new WebApplicationException("Unable to fetch Pcaps via MR job", e);
+    }
+
+    // return http status '200 OK' along with the complete pcaps response file,
+    // and headers
+    return Response
+            .ok(response.getPcaps(), MediaType.APPLICATION_OCTET_STREAM)
+            .status(200).build();
+  }
+  /**
+   * This method parses the each value in the List using delimiter ',' and
+   * builds a new List;.
+   *
+   * @param keys
+   *            list of keys to be parsed
+   * @return list of keys
+   */
+  @VisibleForTesting
+  List<String> parseKeys(List<String> keys) {
+    // Assert.notEmpty(keys);
+    List<String> parsedKeys = new ArrayList<String>();
+    for (String key : keys) {
+      parsedKeys.addAll(Arrays.asList(StringUtils.split(
+              StringUtils.trim(key), ",")));
+    }
+    return parsedKeys;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/28c250d1/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapScannerHBaseImpl.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapScannerHBaseImpl.java b/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapScannerHBaseImpl.java
deleted file mode 100644
index f163408..0000000
--- a/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapScannerHBaseImpl.java
+++ /dev/null
@@ -1,319 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.pcapservice;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.MasterNotRunningException;
-import org.apache.hadoop.hbase.ZooKeeperConnectionException;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.NoServerForRegionException;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.log4j.Logger;
-import org.springframework.util.Assert;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.metron.pcap.PcapMerger;
-
-/**
- * Singleton class which integrates with HBase table and returns sorted pcaps
- * based on the timestamp for the given range of keys. Creates HConnection if it
- * is not already created and the same connection instance is being used for all
- * requests
- * 
- * @author sheetal
- * @version $Revision: 1.0 $
- */
-public class PcapScannerHBaseImpl implements IPcapScanner {
-
-  /** The Constant LOGGER. */
-  private static final Logger LOGGER = Logger
-      .getLogger(PcapScannerHBaseImpl.class);
-
-  /** The Constant DEFAULT_HCONNECTION_RETRY_LIMIT. */
-  private static final int DEFAULT_HCONNECTION_RETRY_LIMIT = 0;
-
-  /** The pcap scanner h base. */
-  private static IPcapScanner pcapScannerHBase = null;
-
-  /*
-   * (non-Javadoc)
-   * 
-   * @see com.cisco.opensoc.hbase.client.IPcapScanner#getPcaps(java.lang.String,
-   * java.lang.String, long, long, long)
-   */
-  
-  public byte[] getPcaps(String startKey, String endKey, long maxResultSize,
-      long startTime, long endTime) throws IOException {
-    Assert.hasText(startKey, "startKey must no be null or empty");
-    byte[] cf = Bytes.toBytes(ConfigurationUtil.getConfiguration()
-        .getString("hbase.table.column.family"));
-    byte[] cq = Bytes.toBytes(ConfigurationUtil.getConfiguration()
-        .getString("hbase.table.column.qualifier"));
-    // create scan request
-    Scan scan = createScanRequest(cf, cq, startKey, endKey, maxResultSize,
-        startTime, endTime);
-    List<byte[]> pcaps = new ArrayList<byte[]>();
-    HTable table = null;
-    try {
-      pcaps = scanPcaps(pcaps, table, scan, cf, cq);
-    } catch (IOException e) {
-      LOGGER.error(
-          "Exception occurred while fetching Pcaps for the key range : startKey="
-              + startKey + ", endKey=" + endKey, e);
-      if (e instanceof ZooKeeperConnectionException
-          || e instanceof MasterNotRunningException
-          || e instanceof NoServerForRegionException) {
-        int maxRetryLimit = getConnectionRetryLimit();
-        for (int attempt = 1; attempt <= maxRetryLimit; attempt++) {
-          try {
-            HBaseConfigurationUtil.closeConnection(); // closing the existing
-                                                      // connection and retry,
-                                                      // it will create a new
-                                                      // HConnection
-            pcaps = scanPcaps(pcaps, table, scan, cf, cq);
-            break;
-          } catch (IOException ie) {
-            if (attempt == maxRetryLimit) {
-              System.out.println("Throwing the exception after retrying "
-                  + maxRetryLimit + " times.");
-              throw e;
-            }
-          }
-        }
-      } else {
-        throw e;
-      }
-    } finally {
-      if (table != null) {
-        table.close();
-      }
-    }
-    if (pcaps.size() == 1) {
-      return pcaps.get(0);
-    }
-    ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    PcapMerger.merge(baos, pcaps);
-    byte[] response = baos.toByteArray();
-    return response;
-  }
-
-  /**
-   * Creates the scan request.
-   * 
-   * @param cf
-   *          the cf
-   * @param cq
-   *          the cq
-   * @param startKey
-   *          the start key
-   * @param endKey
-   *          the end key
-   * @param maxResultSize
-   *          the max result size
-   * @param startTime
-   *          the start time
-   * @param endTime
-   *          the end time
-   * @return the scan
-   * @throws IOException
-   *           Signals that an I/O exception has occurred.
-   */
-  @VisibleForTesting
-  Scan createScanRequest(byte[] cf, byte[] cq, String startKey, String endKey,
-      long maxResultSize, long startTime, long endTime) throws IOException {
-    Scan scan = new Scan();
-    scan.addColumn(cf, cq);
-    scan.setMaxVersions(ConfigurationUtil.getConfiguration().getInt(
-        "hbase.table.column.maxVersions"));
-    scan.setStartRow(startKey.getBytes());
-    if (endKey != null) {
-      scan.setStopRow(endKey.getBytes());
-    }
-    scan.setMaxResultSize(maxResultSize);
-    boolean setTimeRange = true;
-    if (startTime < 0 && endTime < 0) {
-      setTimeRange = false;
-    }
-    if (setTimeRange) {
-      if (startTime < 0) {
-        startTime = 0;
-      } else {
-        startTime = PcapHelper.convertToDataCreationTimeUnit(startTime);
-      }
-      if (endTime < 0) {
-        endTime = Long.MAX_VALUE;
-      } else {
-        endTime = PcapHelper.convertToDataCreationTimeUnit(endTime);
-      }
-      Assert.isTrue(startTime < endTime,
-          "startTime value must be less than endTime value");
-    }
-    // create Scan request;
-    if (setTimeRange) {
-      scan.setTimeRange(startTime, endTime);
-    }
-    return scan;
-  }
-
-  /**
-   * Scan pcaps.
-   * 
-   * @param pcaps
-   *          the pcaps
-   * @param table
-   *          the table
-   * @param scan
-   *          the scan
-   * @param cf
-   *          the cf
-   * @param cq
-   *          the cq
-   * @return the list
-   * @throws IOException
-   *           Signals that an I/O exception has occurred.
-   */
-  @VisibleForTesting
-  List<byte[]> scanPcaps(List<byte[]> pcaps, HTable table, Scan scan,
-      byte[] cf, byte[] cq) throws IOException {
-    LOGGER.info("Scan =" + scan.toString());
-    table = (HTable) HBaseConfigurationUtil.getConnection().getTable(
-    		ConfigurationUtil.getConfiguration().getString("hbase.table.name"));
-    ResultScanner resultScanner = table.getScanner(scan);
-    List<Cell> scannedCells = new ArrayList<Cell>();
-    for (Result result = resultScanner.next(); result != null; result = resultScanner
-        .next()) {
-      List<Cell> cells = result.getColumnCells(cf, cq);
-      if (cells != null) {
-        for (Cell cell : cells) {
-          scannedCells.add(cell);
-        }
-      }
-    }
-    Collections.sort(scannedCells, PcapHelper.getCellTimestampComparator());
-    LOGGER.info("sorted cells :" + scannedCells.toString());
-    for (Cell sortedCell : scannedCells) {
-      pcaps.add(CellUtil.cloneValue(sortedCell));
-    }
-    return pcaps;
-  }
-
-  /**
-   * Gets the connection retry limit.
-   * 
-   * @return the connection retry limit
-   */
-  private int getConnectionRetryLimit() {
-    return ConfigurationUtil.getConfiguration().getInt(
-        "hbase.hconnection.retries.number", DEFAULT_HCONNECTION_RETRY_LIMIT);
-  }
-
-  /*
-   * (non-Javadoc)
-   * 
-   * @see com.cisco.opensoc.hbase.client.IPcapScanner#getPcaps(java.lang.String,
-   * java.lang.String)
-   */
-  
-  public byte[] getPcaps(String startKey, String endKey) throws IOException {
-    Assert.hasText(startKey, "startKey must no be null or empty");
-    Assert.hasText(endKey, "endKey must no be null or empty");
-    return getPcaps(startKey, endKey, ConfigurationUtil.getDefaultResultSize(),
-        -1, -1);
-  }
-
-  /**
-   * Always returns the singleton instance.
-   * 
-   * @return IPcapScanner singleton instance
-   * @throws IOException
-   *           Signals that an I/O exception has occurred.
-   */
-  public static IPcapScanner getInstance() throws IOException {
-    if (pcapScannerHBase == null) {
-      synchronized (PcapScannerHBaseImpl.class) {
-        if (pcapScannerHBase == null) {
-          pcapScannerHBase = new PcapScannerHBaseImpl();
-        }
-      }
-    }
-    return pcapScannerHBase;
-  }
-
-  /**
-   * Instantiates a new pcap scanner h base impl.
-   */
-  private PcapScannerHBaseImpl() {
-  }
-
-  /**
-   * The main method.
-   */
-  // public static void main(String[] args) throws IOException {
-  // if (args == null || args.length < 3) {
-  // usage();
-  // return;
-  // }
-  // String outputFileName = null;
-  // String startKey = null;
-  // String stopKey = null;
-  // outputFileName = args[0];
-  // startKey = args[1];
-  // if (args.length > 2) { // NOPMD by sheetal on 1/29/14 3:55 PM
-  // stopKey = args[2];
-  // }
-  // PcapScannerHBaseImpl downloader = new PcapScannerHBaseImpl();
-  // byte[] pcaps = downloader.getPcaps(startKey, stopKey, defaultResultSize, 0,
-  // Long.MAX_VALUE);
-  // File file = new File(outputFileName);
-  // FileUtils.write(file, "", false);
-  // ByteArrayOutputStream baos = new ByteArrayOutputStream(); //
-  // $codepro.audit.disable
-  // // closeWhereCreated
-  // PcapMerger.merge(baos, pcaps);
-  // FileUtils.writeByteArrayToFile(file, baos.toByteArray(), true);
-  // }
-
-  /**
-   * Usage.
-   */
-  @SuppressWarnings("unused")
-  private static void usage() {
-    System.out.println("java " + PcapScannerHBaseImpl.class.getName() // NOPMD
-                                                                      // by
-        // sheetal
-        // <!-- //
-        // $codepro.audit.disable
-        // debuggingCode
-        // -->
-        // on
-        // 1/29/14
-        // 3:55
-        // PM
-        + " <zk quorum> <output file> <start key> [stop key]");
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/28c250d1/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapsResponse.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapsResponse.java b/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapsResponse.java
index b2fada1..a5f825d 100644
--- a/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapsResponse.java
+++ b/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapsResponse.java
@@ -73,58 +73,6 @@ public class PcapsResponse {
     this.pcaps.add(pcaps);
   }
 
-  /**
-   * Gets the partial response key.
-   * 
-   * @return the partial response key
-   */
-  public String getLastRowKey() {
-    return lastRowKey;
-  }
-
-  /**
-   * Sets the partial response key.
-   * 
-   * @param lastRowKey
-   *          the last row key
-   */
-  public void setLastRowKey(String lastRowKey) {
-    this.lastRowKey = lastRowKey;
-  }
-
-  /**
-   * Gets the status.
-   * 
-   * @return the status
-   */
-  public Status getStatus() {
-    return status;
-  }
-
-  /**
-   * Sets the status.
-   * 
-   * @param status
-   *          the new status
-   */
-  public void setStatus(Status status) {
-    this.status = status;
-  }
-
-  /**
-   * Checks if is resonse size within limit.
-   * 
-   * @param maxResultSize
-   *          the max result size
-   * @return true, if is resonse size within limit
-   */
-  public boolean isResonseSizeWithinLimit(long maxResultSize) {
-    // System.out.println("isResonseSizeWithinLimit() : getResponseSize() < (input|default result size - maximum packet size ) ="+
-    // getResponseSize()+ " < " + ( maxResultSize
-    // -ConfigurationUtil.getMaxRowSize()));
-    return getResponseSize() < (maxResultSize - ConfigurationUtil
-        .getMaxRowSize());
-  }
 
   /**
    * Gets the response size.
@@ -147,6 +95,9 @@ public class PcapsResponse {
    *           Signals that an I/O exception has occurred.
    */
   public byte[] getPcaps() throws IOException {
+    if(pcaps == null) {
+      return new byte[] {};
+    }
     if (pcaps.size() == 1) {
       return pcaps.get(0);
     }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/28c250d1/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/rest/JettyServiceRunner.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/rest/JettyServiceRunner.java b/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/rest/JettyServiceRunner.java
index f4fb27c..2a930b8 100644
--- a/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/rest/JettyServiceRunner.java
+++ b/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/rest/JettyServiceRunner.java
@@ -17,12 +17,13 @@
  */
 package org.apache.metron.pcapservice.rest;
 
+import org.apache.metron.pcapservice.PcapReceiverImplRestEasy;
+
 import java.util.HashSet;
 import java.util.Set;
 
 import javax.ws.rs.core.Application;
 
-import org.apache.metron.pcapservice.PcapReceiverImplRestEasy;
 
 public class JettyServiceRunner extends Application  {
 	
@@ -31,7 +32,7 @@ public class JettyServiceRunner extends Application  {
 		
 	public  JettyServiceRunner() {     
 		// initialize restful services   
-		services.add(new PcapReceiverImplRestEasy());  
+		services.add(new PcapReceiverImplRestEasy());
 	}
 	@Override
 	public  Set getSingletons() {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/28c250d1/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/rest/PcapService.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/rest/PcapService.java b/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/rest/PcapService.java
index b5832b6..6773acb 100644
--- a/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/rest/PcapService.java
+++ b/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/rest/PcapService.java
@@ -19,33 +19,39 @@ package org.apache.metron.pcapservice.rest;
 
 import java.io.IOException;
 
+import org.apache.log4j.Logger;
+import org.apache.metron.api.helper.service.PcapServiceCli;
+import org.apache.metron.pcapservice.ConfigurationUtil;
 import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.servlet.ServletContextHandler;
 import org.eclipse.jetty.servlet.ServletHolder;
 import org.jboss.resteasy.plugins.server.servlet.HttpServletDispatcher;
 
-import org.apache.metron.api.helper.service.PcapServiceCli;
 
 
 public class PcapService {
 
-	public static void main(String[] args) throws IOException {
+  private static final Logger LOG = Logger.getLogger(PcapService.class);
+  public static void main(String[] args) throws IOException {
 
-		PcapServiceCli cli = new PcapServiceCli(args);
-		cli.parse();
-		
-		Server server = new Server(cli.getPort());
-		ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
-		context.setContextPath("/");
-		ServletHolder h = new ServletHolder(new HttpServletDispatcher());
-		h.setInitParameter("javax.ws.rs.Application", "org.apache.metron.pcapservice.rest.JettyServiceRunner");
-		context.addServlet(h, "/*");
-		server.setHandler(context);
-		try {
-			server.start();
-			server.join();
-		} catch (Exception e) {
-			e.printStackTrace();
-		}
-	}
+    PcapServiceCli cli = new PcapServiceCli(args);
+    cli.parse();
+    ConfigurationUtil.setPcapOutputPath(cli.getPcapHdfsPath());
+    LOG.info("Pcap location set to " + cli.getPcapHdfsPath());
+    ConfigurationUtil.setTempQueryOutputPath(cli.getQueryHdfsPath());
+    LOG.info("Query temp location set to " + cli.getQueryHdfsPath());
+    Server server = new Server(cli.getPort());
+    ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
+    context.setContextPath("/");
+    ServletHolder h = new ServletHolder(new HttpServletDispatcher());
+    h.setInitParameter("javax.ws.rs.Application", "org.apache.metron.pcapservice.rest.JettyServiceRunner");
+    context.addServlet(h, "/*");
+    server.setHandler(context);
+    try {
+      server.start();
+      server.join();
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/28c250d1/metron-platform/metron-api/src/main/resources/config-definition-hbase.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-api/src/main/resources/config-definition-hbase.xml b/metron-platform/metron-api/src/main/resources/config-definition-hbase.xml
deleted file mode 100644
index 98ece42..0000000
--- a/metron-platform/metron-api/src/main/resources/config-definition-hbase.xml
+++ /dev/null
@@ -1,50 +0,0 @@
-<?xml version="1.0" encoding="ISO-8859-1" ?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
--->
-<configuration>
-	<header>
-		<result delimiterParsingDisabled="true" forceReloadCheck="true"></result>
-		<lookups>
-      		<lookup config-prefix="expr"
-              	config-class="org.apache.commons.configuration.interpol.ExprLookup">
-        		<variables>
-          			<variable name="System" value="Class:java.lang.System"/>
-          			<variable name="net" value="Class:java.net.InetAddress"/>
-          			<variable name="String" value="Class:org.apache.commons.lang.StringUtils"/>
-        		</variables>
-      		</lookup>
-    	</lookups>
-	</header>
-	<override>
-		<!-- 1. properties from 'hbae-config.properties' are loaded first;
-				if a property is not present in this file, then it will search in the files in the order they are defined here.
-		     2. 'refreshDelay' indicates the minimum delay in milliseconds between checks to see if the underlying file is changed.
-		     3. 'config-optional' indicates this file is not required -->
-
-		<properties fileName="${expr:System.getProperty('configPath')+'/hbase-config.properties'}"  config-optional="true">
-			<reloadingStrategy refreshDelay="${expr:System.getProperty('configRefreshDelay')}"
-	      config-class="org.apache.commons.configuration.reloading.FileChangedReloadingStrategy"/>
-	     </properties>
-
-		<properties fileName="hbase-config-default.properties" config-optional="true">
-<!-- 					<reloadingStrategy refreshDelay="${expr:System.getProperty('defaultConfigRefreshDelay')}"
-	      config-class="org.apache.commons.configuration.reloading.FileChangedReloadingStrategy"/>
- -->	     </properties>
-
-	</override>
-</configuration>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/28c250d1/metron-platform/metron-api/src/main/resources/hbase-config-default.properties
----------------------------------------------------------------------
diff --git a/metron-platform/metron-api/src/main/resources/hbase-config-default.properties b/metron-platform/metron-api/src/main/resources/hbase-config-default.properties
deleted file mode 100644
index 0f47193..0000000
--- a/metron-platform/metron-api/src/main/resources/hbase-config-default.properties
+++ /dev/null
@@ -1,57 +0,0 @@
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-#  limitations under the License.
-
-
-#hbase zoo keeper configuration
-hbase.zookeeper.quorum=zkpr1,zkpr2,zkpr3
-hbase.zookeeper.clientPort=2181
-hbase.client.retries.number=1
-zookeeper.session.timeout=60000
-zookeeper.recovery.retry=0
-
-#hbase table configuration
-hbase.table.name=pcap
-hbase.table.column.family=t
-hbase.table.column.qualifier=value
-hbase.table.column.maxVersions=5
-
-# scan size limit configuration in MB or KB; if the input is negative or greater than max value throw an error.
-hbase.scan.result.size.unit=MB
-hbase.scan.default.result.size=6
-hbase.scan.max.result.size=60
-
-# time stamp conversion configuration; possible values 'SECONDS'(seconds), 'MILLIS'(milli seconds), 'MICROS' (micro seconds)
-hbase.table.data.time.unit=MILLIS
-
-#number of retries in case of ZooKeeper or HBase server down
-hbase.hconnection.retries.number=3
-
-#configuration for including pcaps in the reverse traffic
-pcaps.include.reverse.traffic = false
-
-#maximum table row size in KB or MB 
-hbase.table.row.size.unit = KB
-hbase.table.max.row.size = 70
-
-# tokens of row key configuration
-hbase.table.row.key.tokens=7
-rest.api.input.key.min.tokens=5
-
-# whether or not to include the last row from the previous request, applicable for only partial response scenario
-hbase.table.scan.include.duplicate.lastrow= true;
-
-#number of digits for appending tokens of the row key
-hbase.table.row.key.token.appending.digits=5

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/28c250d1/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/CellTimestampComparatorTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/CellTimestampComparatorTest.java b/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/CellTimestampComparatorTest.java
deleted file mode 100644
index 9d10c92..0000000
--- a/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/CellTimestampComparatorTest.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.pcapservice;
-
-import junit.framework.Assert;
-
-import org.apache.hadoop.hbase.Cell;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-import org.apache.metron.pcapservice.CellTimestampComparator;
-
-/**
- * The Class CellTimestampComparatorTest.
- */
-public class CellTimestampComparatorTest {
-
-  /**
-   * Sets the up.
-   * 
-   * @throws Exception
-   *           the exception
-   */
-  @Before
-  public void setUp() throws Exception {
-  }
-
-  /**
-   * Tear down.
-   * 
-   * @throws Exception
-   *           the exception
-   */
-  @After
-  public void tearDown() throws Exception {
-  }
-
-  /**
-   * Test_less.
-   */
-  @Test
-  public void test_less() {
-    // mocking
-    Cell cell1 = Mockito.mock(Cell.class);
-    Mockito.when(cell1.getTimestamp()).thenReturn(13945345808L);
-    Cell cell2 = Mockito.mock(Cell.class);
-    Mockito.when(cell2.getTimestamp()).thenReturn(13845345808L);
-
-    CellTimestampComparator comparator = new CellTimestampComparator();
-
-    // actual call and verify
-    Assert.assertTrue(comparator.compare(cell2, cell1) == -1);
-
-  }
-
-  /**
-   * Test_greater.
-   */
-  @Test
-  public void test_greater() {
-    // mocking
-    Cell cell1 = Mockito.mock(Cell.class);
-    Mockito.when(cell1.getTimestamp()).thenReturn(13745345808L);
-    Cell cell2 = Mockito.mock(Cell.class);
-    Mockito.when(cell2.getTimestamp()).thenReturn(13945345808L);
-
-    CellTimestampComparator comparator = new CellTimestampComparator();
-
-    // actual call and verify
-    Assert.assertTrue(comparator.compare(cell2, cell1) == 1);
-
-  }
-
-  /**
-   * Test_equal.
-   */
-  @Test
-  public void test_equal() {
-    // mocking
-    Cell cell1 = Mockito.mock(Cell.class);
-    Mockito.when(cell1.getTimestamp()).thenReturn(13945345808L);
-    Cell cell2 = Mockito.mock(Cell.class);
-    Mockito.when(cell2.getTimestamp()).thenReturn(13945345808L);
-
-    CellTimestampComparator comparator = new CellTimestampComparator();
-
-    // actual call and verify
-    Assert.assertTrue(comparator.compare(cell2, cell1) == 0);
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/28c250d1/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/ConfigurationUtilTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/ConfigurationUtilTest.java b/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/ConfigurationUtilTest.java
index 3424425..3c0a77b 100644
--- a/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/ConfigurationUtilTest.java
+++ b/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/ConfigurationUtilTest.java
@@ -17,51 +17,29 @@
  */
 package org.apache.metron.pcapservice;
 
+import org.junit.Assert;
 import org.junit.Test;
 
-import org.apache.metron.pcapservice.ConfigurationUtil;
-import org.apache.metron.pcapservice.ConfigurationUtil.SizeUnit;
-import org.springframework.util.Assert;
 
-/**
- * The Class ConfigurationUtilTest.
- */
-public class ConfigurationUtilTest {
 
-  /**
-   * Test_get max allowable result size in bytes.
-   */
-  @Test
-  public void test_getMaxAllowableResultSizeInBytes() {
-    long result = ConfigurationUtil.getMaxResultSize();
-    Assert.isTrue(result == 62914560);
-  }
+public class ConfigurationUtilTest {
 
-  /**
-   * Test_get max allowable results size unit.
-   */
-  @Test
-  public void test_getMaxAllowableResultsSizeUnit() {
-    SizeUnit result = ConfigurationUtil.getResultSizeUnit();
-    Assert.isTrue(SizeUnit.MB == result);
-  }
 
-  /**
-   * Test_get max row size in bytes.
-   */
   @Test
-  public void test_getMaxRowSizeInBytes() {
-    long result = ConfigurationUtil.getMaxRowSize();
-    Assert.isTrue(result == 71680);
+  public void test_getPcapOutputPath() {
+    Assert.assertEquals(ConfigurationUtil.getPcapOutputPath(), null);
+    ConfigurationUtil.setPcapOutputPath("/foo");
+    Assert.assertEquals(ConfigurationUtil.getPcapOutputPath(), "/foo");
   }
 
   /**
-   * Test_get max row size unit.
+   * Test_get max allowable results size unit.
    */
   @Test
-  public void test_getMaxRowSizeUnit() {
-    SizeUnit result = ConfigurationUtil.getRowSizeUnit();
-    Assert.isTrue(SizeUnit.KB == result);
+  public void test_getTempQueryDir() {
+    Assert.assertEquals(ConfigurationUtil.getTempQueryOutputPath(), null);
+    ConfigurationUtil.setTempQueryOutputPath("/tmp");
+    Assert.assertEquals(ConfigurationUtil.getTempQueryOutputPath(), "/tmp");
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/28c250d1/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/HBaseConfigurationUtilTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/HBaseConfigurationUtilTest.java b/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/HBaseConfigurationUtilTest.java
deleted file mode 100644
index 75ac782..0000000
--- a/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/HBaseConfigurationUtilTest.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.pcapservice;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.springframework.util.Assert;
-
-import org.apache.metron.pcapservice.HBaseConfigurationUtil;
-
-/**
- * The Class HBaseConfigurationUtilTest.
- */
-public class HBaseConfigurationUtilTest {
-
-  /**
-   * Sets the up.
-   * 
-   * @throws Exception
-   *           the exception
-   */
-  @Before
-  public void setUp() throws Exception {
-  }
-
-  /**
-   * Tear down.
-   * 
-   * @throws Exception
-   *           the exception
-   */
-  @After
-  public void tearDown() throws Exception {
-  }
-
-  /**
-   * Test_read.
-   * 
-   * @throws IOException
-   *           Signals that an I/O exception has occurred.
-   */
-  @Test
-  public void test_read() throws IOException {
-    Configuration configuration = HBaseConfigurationUtil.read();
-    Assert.isTrue(configuration != null, "Configuration must not be null");
-    Assert.isTrue(configuration.get("hbase.client.retries.number").equals("1"),
-        "value must be equal");
-  }
-
-}