You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/04/29 18:28:35 UTC
[5/6] incubator-metron git commit: METRON-119 - Move PCAP
infrastructure from HBase closes apache/incubator-metron#93
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/28c250d1/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/IPcapScanner.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/IPcapScanner.java b/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/IPcapScanner.java
deleted file mode 100644
index 4101328..0000000
--- a/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/IPcapScanner.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.pcapservice;
-
-import java.io.IOException;
-
-/**
- * The Interface for all pcaps fetching methods based on key range.
- */
-public interface IPcapScanner {
-
- /**
- * Gets the pcaps for between startKey (inclusive) and endKey (exclusive).
- *
- * @param startKey
- * the start key of a key range for which pcaps is to be retrieved.
- * @param endKey
- * the end key of a key range for which pcaps is to be retrieved.
- * @param maxResponseSize
- * indicates the maximum response size in MegaBytes(MB). User needs
- * to pass positive value and must be less than 60 (MB)
- * @param startTime
- * the start time in system milliseconds to be used to filter the
- * pcaps. The value is set to '0' if the caller sends negative value
- * @param endTime
- * the end time in system milliseconds to be used to filter the
- * pcaps. The value is set Long.MAX_VALUE if the caller sends
- * negative value
- * @return byte array with all matching pcaps merged together
- * @throws IOException
- * Signals that an I/O exception has occurred.
- */
- public byte[] getPcaps(String startKey, String endKey, long maxResponseSize,
- long startTime, long endTime) throws IOException;
-
- /**
- * Gets the pcaps for between startKey (inclusive) and endKey (exclusive).
- *
- * @param startKey
- * the start key (inclusive) of a key range for which pcaps is to be
- * retrieved.
- * @param endKey
- * the end key (exclusive) of a key range for which pcaps is to be
- * retrieved.
- * @return byte array with all matching pcaps merged together
- * @throws IOException
- * Signals that an I/O exception has occurred.
- */
- public byte[] getPcaps(String startKey, String endKey) throws IOException;
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/28c250d1/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapGetterHBaseImpl.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapGetterHBaseImpl.java b/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapGetterHBaseImpl.java
deleted file mode 100644
index 58fecb9..0000000
--- a/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapGetterHBaseImpl.java
+++ /dev/null
@@ -1,826 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.pcapservice;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import javax.annotation.Resource;
-import javax.ws.rs.GET;
-import javax.ws.rs.Path;
-import javax.ws.rs.Produces;
-import javax.ws.rs.core.Response;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.MasterNotRunningException;
-import org.apache.hadoop.hbase.ZooKeeperConnectionException;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.NoServerForRegionException;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.log4j.Logger;
-import org.springframework.util.Assert;
-import org.springframework.util.CollectionUtils;
-
-import com.google.common.annotations.VisibleForTesting;
-
-/**
- * Singleton class which integrates with HBase table and returns pcaps sorted by
- * timestamp(dsc) for the given list of keys. Creates HConnection if it is not
- * already created and the same connection instance is being used for all
- * requests
- *
- * @author sheetal
- * @version $Revision: 1.0 $
- */
-
-@Path("/")
-public class PcapGetterHBaseImpl implements IPcapGetter {
-
- /** The pcap getter h base. */
- private static IPcapGetter pcapGetterHBase = null;
-
- /** The Constant LOG. */
- private static final Logger LOGGER = Logger
- .getLogger(PcapGetterHBaseImpl.class);
-
- /*
- * (non-Javadoc)
- *
- * @see com.cisco.opensoc.hbase.client.IPcapGetter#getPcaps(java.util.List,
- * java.lang.String, long, long, boolean, boolean, long)
- */
-
-
- @GET
- @Path("pcap/test")
- @Produces("text/html")
- public Response index() throws URISyntaxException {
- return Response.ok("ALL GOOD").build();
- }
-
-
- public PcapsResponse getPcaps(List<String> keys, String lastRowKey,
- long startTime, long endTime, boolean includeReverseTraffic,
- boolean includeDuplicateLastRow, long maxResultSize) throws IOException {
- Assert
- .isTrue(
- checkIfValidInput(keys, lastRowKey),
- "No valid input. One of the value must be present from {keys, lastRowKey}");
- LOGGER.info(" keys=" + keys.toString() + "; lastRowKey="
- + lastRowKey);
-
- PcapsResponse pcapsResponse = new PcapsResponse();
- // 1. Process partial response key
- if (StringUtils.isNotEmpty(lastRowKey)) {
- pcapsResponse = processKey(pcapsResponse, lastRowKey, startTime,
- endTime, true, includeDuplicateLastRow, maxResultSize);
- // LOGGER.debug("after scanning lastRowKey=" +
- // pcapsResponse.toString()+"*********************************************************************");
- if (pcapsResponse.getStatus() == PcapsResponse.Status.PARTIAL) {
- return pcapsResponse;
- }
- }
- // 2. Process input keys
- List<String> sortedKeys = sortKeysByAscOrder(keys, includeReverseTraffic);
- List<String> unprocessedKeys = new ArrayList<String>();
- unprocessedKeys.addAll(sortedKeys);
- if (StringUtils.isNotEmpty(lastRowKey)) {
- unprocessedKeys.clear();
- unprocessedKeys = getUnprocessedSublistOfKeys(sortedKeys,
- lastRowKey);
- }
- LOGGER.info("unprocessedKeys in getPcaps" + unprocessedKeys.toString());
- if (!CollectionUtils.isEmpty(unprocessedKeys)) {
- for (int i = 0; i < unprocessedKeys.size(); i++) {
- pcapsResponse = processKey(pcapsResponse, unprocessedKeys.get(i),
- startTime, endTime, false, includeDuplicateLastRow, maxResultSize);
- // LOGGER.debug("after scanning input unprocessedKeys.get(" + i + ") ="
- // +
- // pcapsResponse.toString()+"*********************************************************************");
- if (pcapsResponse.getStatus() == PcapsResponse.Status.PARTIAL) {
- return pcapsResponse;
- }
- }
- }
- return pcapsResponse;
- }
-
- /*
- * (non-Javadoc)
- *
- * @see com.cisco.opensoc.hbase.client.IPcapGetter#getPcaps(java.lang.String, long,
- * long, boolean)
- */
-
- public PcapsResponse getPcaps(String key, long startTime, long endTime,
- boolean includeReverseTraffic) throws IOException {
- Assert.hasText(key, "key must not be null or empty");
- return getPcaps(Arrays.asList(key), null, startTime, endTime,
- includeReverseTraffic, false, ConfigurationUtil.getDefaultResultSize());
- }
-
- /*
- * (non-Javadoc)
- *
- * @see com.cisco.opensoc.hbase.client.IPcapGetter#getPcaps(java.util.List)
- */
-
- public PcapsResponse getPcaps(List<String> keys) throws IOException {
- Assert.notEmpty(keys, "'keys' must not be null or empty");
- return getPcaps(keys, null, -1, -1,
- ConfigurationUtil.isDefaultIncludeReverseTraffic(), false,
- ConfigurationUtil.getDefaultResultSize());
- }
-
- /*
- * (non-Javadoc)
- *
- * @see com.cisco.opensoc.hbase.client.IPcapGetter#getPcaps(java.lang.String)
- */
-
- public PcapsResponse getPcaps(String key) throws IOException {
- Assert.hasText(key, "key must not be null or empty");
- return getPcaps(Arrays.asList(key), null, -1, -1,
- ConfigurationUtil.isDefaultIncludeReverseTraffic(), false,
- ConfigurationUtil.getDefaultResultSize());
- }
-
- /**
- * Always returns the singleton instance.
- *
- * @return IPcapGetter singleton instance
- * @throws IOException
- * Signals that an I/O exception has occurred.
- */
- public static IPcapGetter getInstance() throws IOException {
- if (pcapGetterHBase == null) {
- synchronized (PcapGetterHBaseImpl.class) {
- if (pcapGetterHBase == null) {
- pcapGetterHBase = new PcapGetterHBaseImpl();
- }
- }
- }
- return pcapGetterHBase;
- }
-
- /**
- * Instantiates a new pcap getter h base impl.
- */
- private PcapGetterHBaseImpl() {
- }
-
- /**
- * Adds reverse keys to the list if the flag 'includeReverseTraffic' is set to
- * true; removes duplicates and sorts the list by ascending order;.
- *
- * @param keys
- * input keys
- * @param includeReverseTraffic
- * flag whether or not to include reverse traffic
- * @return List<String>
- */
- @VisibleForTesting
- List<String> sortKeysByAscOrder(List<String> keys,
- boolean includeReverseTraffic) {
- Assert.notEmpty(keys, "'keys' must not be null");
- if (includeReverseTraffic) {
- keys.addAll(PcapHelper.reverseKey(keys));
- }
- List<String> deDupKeys = removeDuplicateKeys(keys);
- Collections.sort(deDupKeys);
- return deDupKeys;
- }
-
- /**
- * Removes the duplicate keys.
- *
- * @param keys
- * the keys
- * @return the list
- */
- @VisibleForTesting
-public
- List<String> removeDuplicateKeys(List<String> keys) {
- Set<String> set = new HashSet<String>(keys);
- return new ArrayList<String>(set);
- }
-
- /**
- * <p>
- * Returns the sublist starting from the element after the lastRowKey
- * to the last element in the list; if the 'lastRowKey' is not matched
- * the complete list will be returned.
- * </p>
- *
- * <pre>
- * Eg :
- * keys = [18800006-1800000b-06-0019-caac, 18800006-1800000b-06-0050-5af6, 18800006-1800000b-11-0035-3810]
- * lastRowKey = "18800006-1800000b-06-0019-caac-65140-40815"
- * and the response from this method [18800006-1800000b-06-0050-5af6, 18800006-1800000b-11-0035-3810]
- * </pre>
- *
- * @param keys
- * keys
- * @param lastRowKey
- * last row key of the previous partial response
- * @return List<String>
- */
- @VisibleForTesting
- List<String> getUnprocessedSublistOfKeys(List<String> keys,
- String lastRowKey) {
- Assert.notEmpty(keys, "'keys' must not be null");
- Assert.hasText(lastRowKey, "'lastRowKey' must not be null");
- String partialKey = getTokens(lastRowKey, 5);
- int startIndex = 0;
- for (int i = 0; i < keys.size(); i++) {
- if (partialKey.equals(keys.get(i))) {
- startIndex = i + 1;
- break;
- }
- }
- List<String> unprocessedKeys = keys.subList(startIndex, keys.size());
- return unprocessedKeys;
- }
-
- /**
- * Returns the first 'noOfTokens' tokens from the given key; token delimiter
- * "-";.
- *
- * @param key
- * given key
- * @param noOfTokens
- * number of tokens to retrieve
- * @return the tokens
- */
- @VisibleForTesting
- String getTokens(String key, int noOfTokens) {
- String delimeter = HBaseConfigConstants.PCAP_KEY_DELIMETER;
- String regex = "\\" + delimeter;
- String[] keyTokens = key.split(regex);
- Assert.isTrue(noOfTokens < keyTokens.length,
- "Invalid value for 'noOfTokens'");
- StringBuffer sbf = new StringBuffer();
- for (int i = 0; i < noOfTokens; i++) {
- sbf.append(keyTokens[i]);
- if (i != (noOfTokens - 1)) {
- sbf.append(HBaseConfigConstants.PCAP_KEY_DELIMETER);
- }
-
- }
- return sbf.toString();
- }
-
- /**
- * Process key.
- *
- * @param pcapsResponse
- * the pcaps response
- * @param key
- * the key
- * @param startTime
- * the start time
- * @param endTime
- * the end time
- * @param isPartialResponse
- * the is partial response
- * @param includeDuplicateLastRow
- * the include duplicate last row
- * @param maxResultSize
- * the max result size
- * @return the pcaps response
- * @throws IOException
- * Signals that an I/O exception has occurred.
- */
- @VisibleForTesting
- PcapsResponse processKey(PcapsResponse pcapsResponse, String key,
- long startTime, long endTime, boolean isPartialResponse,
- boolean includeDuplicateLastRow, long maxResultSize) throws IOException {
- HTable table = null;
- Scan scan = null;
- List<Cell> scannedCells = null;
- try {
- // 1. Create start and stop row for the key;
- Map<String, String> keysMap = createStartAndStopRowKeys(key,
- isPartialResponse, includeDuplicateLastRow);
-
- // 2. if the input key contains all fragments (7) and it is not part
- // of previous partial response (isPartialResponse),
- // 'keysMap' will be null; do a Get; currently not doing any
- // response size related checks for Get;
- // by default all cells from a specific row are sorted by timestamp
- if (keysMap == null) {
- Get get = createGetRequest(key, startTime, endTime);
- List<Cell> cells = executeGetRequest(table, get);
- for (Cell cell : cells) {
- pcapsResponse.addPcaps(CellUtil.cloneValue(cell));
- }
- return pcapsResponse;
- }
- // 3. Create and execute Scan request
- scan = createScanRequest(pcapsResponse, keysMap, startTime, endTime,
- maxResultSize);
- scannedCells = executeScanRequest(table, scan);
- LOGGER.info("scannedCells size :" + scannedCells.size());
- addToResponse(pcapsResponse, scannedCells, maxResultSize);
-
- } catch (IOException e) {
- LOGGER.error("Exception occurred while fetching Pcaps for the keys :"
- + key, e);
- if (e instanceof ZooKeeperConnectionException
- || e instanceof MasterNotRunningException
- || e instanceof NoServerForRegionException) {
- int maxRetryLimit = ConfigurationUtil.getConnectionRetryLimit();
- System.out.println("maxRetryLimit =" + maxRetryLimit);
- for (int attempt = 1; attempt <= maxRetryLimit; attempt++) {
- System.out.println("attempting =" + attempt);
- try {
- HBaseConfigurationUtil.closeConnection(); // closing the
- // existing
- // connection
- // and retry,
- // it will
- // create a new
- // HConnection
- scannedCells = executeScanRequest(table, scan);
- addToResponse(pcapsResponse, scannedCells, maxResultSize);
- break;
- } catch (IOException ie) {
- if (attempt == maxRetryLimit) {
- LOGGER.error("Throwing the exception after retrying "
- + maxRetryLimit + " times.");
- throw e;
- }
- }
- }
- }
-
- } finally {
- if (table != null) {
- table.close();
- }
- }
- return pcapsResponse;
- }
-
- /**
- * Adds the to response.
- *
- * @param pcapsResponse
- * the pcaps response
- * @param scannedCells
- * the scanned cells
- * @param maxResultSize
- * the max result size
- */
- private void addToResponse(PcapsResponse pcapsResponse,
- List<Cell> scannedCells, long maxResultSize) {
- String lastKeyFromCurrentScan = null;
- if (scannedCells != null && scannedCells.size() > 0) {
- lastKeyFromCurrentScan = new String(CellUtil.cloneRow(scannedCells
- .get(scannedCells.size() - 1)));
- }
- // 4. calculate the response size
- Collections.sort(scannedCells, PcapHelper.getCellTimestampComparator());
- for (Cell sortedCell : scannedCells) {
- pcapsResponse.addPcaps(CellUtil.cloneValue(sortedCell));
- }
- if (!pcapsResponse.isResonseSizeWithinLimit(maxResultSize)) {
- pcapsResponse.setStatus(PcapsResponse.Status.PARTIAL); // response size
- // reached
- pcapsResponse.setLastRowKey(new String(lastKeyFromCurrentScan));
- }
- }
-
- /**
- * Builds start and stop row keys according to the following logic : 1.
- * Creates tokens out of 'key' using pcap_id delimiter ('-') 2. if the input
- * 'key' contains (assume : configuredTokensInRowKey=7 and
- * minimumTokensIninputKey=5): a). 5 tokens
- * ("srcIp-dstIp-protocol-srcPort-dstPort") startKey =
- * "srcIp-dstIp-protocol-srcPort-dstPort-00000-00000" stopKey =
- * "srcIp-dstIp-protocol-srcPort-dstPort-99999-99999" b). 6 tokens
- * ("srcIp-dstIp-protocol-srcPort-dstPort-id1") startKey =
- * "srcIp-dstIp-protocol-srcPort-dstPort-id1-00000" stopKey =
- * "srcIp-dstIp-protocol-srcPort-dstPort-id1-99999"
- *
- * c). 7 tokens ("srcIp-dstIp-protocol-srcPort-dstPort-id1-id2") 1>. if the
- * key is NOT part of the partial response from previous request, return
- * 'null' 2>. if the key is part of partial response from previous request
- * startKey = "srcIp-dstIp-protocol-srcPort-dstPort-id1-(id2+1)"; 1 is added
- * to exclude this key as it was included in the previous request stopKey =
- * "srcIp-dstIp-protocol-srcPort-dstPort-99999-99999"
- *
- * @param key
- * the key
- * @param isLastRowKey
- * if the key is part of partial response
- * @param includeDuplicateLastRow
- * the include duplicate last row
- * @return Map<String, String>
- */
- @VisibleForTesting
- Map<String, String> createStartAndStopRowKeys(String key,
- boolean isLastRowKey, boolean includeDuplicateLastRow) {
- String delimeter = HBaseConfigConstants.PCAP_KEY_DELIMETER;
- String regex = "\\" + delimeter;
- String[] keyTokens = key.split(regex);
-
- String startKey = null;
- String endKey = null;
- Map<String, String> map = new HashMap<String, String>();
-
- int configuredTokensInRowKey = ConfigurationUtil
- .getConfiguredTokensInRowkey();
- int minimumTokensIninputKey = ConfigurationUtil
- .getMinimumTokensInInputkey();
- Assert
- .isTrue(
- minimumTokensIninputKey <= configuredTokensInRowKey,
- "tokens in the input key (separated by '-'), must be less than or equal to the tokens used in hbase table row key ");
- // in case if the input key contains 'configuredTokensInRowKey' tokens and
- // it is NOT a
- // partial response key, do a Get instead of Scan
- if (keyTokens.length == configuredTokensInRowKey) {
- if (!isLastRowKey) {
- return null;
- }
- // it is a partial response key; 'startKey' is same as input partial
- // response key; 'endKey' can be built by replacing
- // (configuredTokensInRowKey - minimumTokensIninputKey) tokens
- // of input partial response key with '99999'
- if (keyTokens.length == minimumTokensIninputKey) {
- return null;
- }
- int appendingTokenSlots = configuredTokensInRowKey
- - minimumTokensIninputKey;
- if (appendingTokenSlots > 0) {
- String partialKey = getTokens(key, minimumTokensIninputKey);
- StringBuffer sbfStartNew = new StringBuffer(partialKey);
- StringBuffer sbfEndNew = new StringBuffer(partialKey);
- for (int i = 0; i < appendingTokenSlots; i++) {
- if (i == (appendingTokenSlots - 1)) {
- if (!includeDuplicateLastRow) {
- sbfStartNew
- .append(HBaseConfigConstants.PCAP_KEY_DELIMETER)
- .append(
- Integer.valueOf(keyTokens[minimumTokensIninputKey + i]) + 1);
- } else {
- sbfStartNew.append(HBaseConfigConstants.PCAP_KEY_DELIMETER)
- .append(keyTokens[minimumTokensIninputKey + i]);
- }
- } else {
- sbfStartNew.append(HBaseConfigConstants.PCAP_KEY_DELIMETER).append(
- keyTokens[minimumTokensIninputKey + i]);
- }
- sbfEndNew.append(HBaseConfigConstants.PCAP_KEY_DELIMETER).append(
- getMaxLimitForAppendingTokens());
- }
- startKey = sbfStartNew.toString();
- endKey = sbfEndNew.toString();
- }
- } else {
- StringBuffer sbfStart = new StringBuffer(key);
- StringBuffer sbfEnd = new StringBuffer(key);
- for (int i = keyTokens.length; i < configuredTokensInRowKey; i++) {
- sbfStart.append(HBaseConfigConstants.PCAP_KEY_DELIMETER).append(
- getMinLimitForAppendingTokens());
- sbfEnd.append(HBaseConfigConstants.PCAP_KEY_DELIMETER).append(
- getMaxLimitForAppendingTokens());
- }
- startKey = sbfStart.toString();
- endKey = sbfEnd.toString();
- }
- map.put(HBaseConfigConstants.START_KEY, startKey);
- map.put(HBaseConfigConstants.END_KEY, endKey);
-
- return map;
- }
-
- /**
- * Returns false if keys is empty or null AND lastRowKey is null or
- * empty; otherwise returns true;.
- *
- * @param keys
- * input row keys
- * @param lastRowKey
- * partial response key
- * @return boolean
- */
- @VisibleForTesting
- boolean checkIfValidInput(List<String> keys, String lastRowKey) {
- if (CollectionUtils.isEmpty(keys)
- && StringUtils.isEmpty(lastRowKey)) {
- return false;
- }
- return true;
- }
-
- /**
- * Executes the given Get request.
- *
- * @param table
- * hbase table
- * @param get
- * Get
- * @return List<Cell>
- * @throws IOException
- * Signals that an I/O exception has occurred.
- */
- private List<Cell> executeGetRequest(HTable table, Get get)
- throws IOException {
- LOGGER.info("Get :" + get.toString());
- table = (HTable) HBaseConfigurationUtil.getConnection().getTable(
- ConfigurationUtil.getTableName());
- Result result = table.get(get);
- List<Cell> cells = result.getColumnCells(
- ConfigurationUtil.getColumnFamily(),
- ConfigurationUtil.getColumnQualifier());
- return cells;
- }
-
- /**
- * Execute scan request.
- *
- * @param table
- * hbase table
- * @param scan
- * the scan
- * @return the list
- * @throws IOException
- * Signals that an I/O exception has occurred.
- */
- private List<Cell> executeScanRequest(HTable table, Scan scan)
- throws IOException {
- LOGGER.info("Scan :" + scan.toString());
- table = (HTable) HBaseConfigurationUtil.getConnection().getTable(
- ConfigurationUtil.getConfiguration().getString("hbase.table.name"));
- ResultScanner resultScanner = table.getScanner(scan);
- List<Cell> scannedCells = new ArrayList<Cell>();
- for (Result result = resultScanner.next(); result != null; result = resultScanner
- .next()) {
- List<Cell> cells = result.getColumnCells(
- ConfigurationUtil.getColumnFamily(),
- ConfigurationUtil.getColumnQualifier());
- if (cells != null) {
- for (Cell cell : cells) {
- scannedCells.add(cell);
- }
- }
- }
- return scannedCells;
- }
-
- /**
- * Creates the get request.
- *
- * @param key
- * the key
- * @param startTime
- * the start time
- * @param endTime
- * the end time
- * @return the gets the
- * @throws IOException
- * Signals that an I/O exception has occurred.
- */
- @VisibleForTesting
- Get createGetRequest(String key, long startTime, long endTime)
- throws IOException {
- Get get = new Get(Bytes.toBytes(key));
- // set family name
- get.addFamily(ConfigurationUtil.getColumnFamily());
-
- // set column family, qualifier
- get.addColumn(ConfigurationUtil.getColumnFamily(),
- ConfigurationUtil.getColumnQualifier());
-
- // set max versions
- get.setMaxVersions(ConfigurationUtil.getMaxVersions());
-
- // set time range
- setTimeRangeOnGet(get, startTime, endTime);
- return get;
- }
-
- /**
- * Creates the scan request.
- *
- * @param pcapsResponse
- * the pcaps response
- * @param keysMap
- * the keys map
- * @param startTime
- * the start time
- * @param endTime
- * the end time
- * @param maxResultSize
- * the max result size
- * @return the scan
- * @throws IOException
- * Signals that an I/O exception has occurred.
- */
- @VisibleForTesting
- Scan createScanRequest(PcapsResponse pcapsResponse,
- Map<String, String> keysMap, long startTime, long endTime,
- long maxResultSize) throws IOException {
- Scan scan = new Scan();
- // set column family, qualifier
- scan.addColumn(ConfigurationUtil.getColumnFamily(),
- ConfigurationUtil.getColumnQualifier());
-
- // set start and stop keys
- scan.setStartRow(keysMap.get(HBaseConfigConstants.START_KEY).getBytes());
- scan.setStopRow(keysMap.get(HBaseConfigConstants.END_KEY).getBytes());
-
- // set max results size : remaining size = max results size - ( current
- // pcaps response size + possible maximum row size)
- long remainingSize = maxResultSize
- - (pcapsResponse.getResponseSize() + ConfigurationUtil.getMaxRowSize());
-
- if (remainingSize > 0) {
- scan.setMaxResultSize(remainingSize);
- }
- // set max versions
- scan.setMaxVersions(ConfigurationUtil.getConfiguration().getInt(
- "hbase.table.column.maxVersions"));
-
- // set time range
- setTimeRangeOnScan(scan, startTime, endTime);
- return scan;
- }
-
- /**
- * Sets the time range on scan.
- *
- * @param scan
- * the scan
- * @param startTime
- * the start time
- * @param endTime
- * the end time
- * @throws IOException
- * Signals that an I/O exception has occurred.
- */
- private void setTimeRangeOnScan(Scan scan, long startTime, long endTime)
- throws IOException {
- boolean setTimeRange = true;
- if (startTime < 0 && endTime < 0) {
- setTimeRange = false;
- }
- if (setTimeRange) {
- if (startTime < 0) {
- startTime = 0;
- } else {
- startTime = PcapHelper.convertToDataCreationTimeUnit(startTime);
- }
- if (endTime < 0) {
- endTime = Long.MAX_VALUE;
- } else {
- endTime = PcapHelper.convertToDataCreationTimeUnit(endTime);
- }
- Assert.isTrue(startTime < endTime,
- "startTime value must be less than endTime value");
- scan.setTimeRange(startTime, endTime);
- }
- }
-
- /**
- * Sets the time range on get.
- *
- * @param get
- * the get
- * @param startTime
- * the start time
- * @param endTime
- * the end time
- * @throws IOException
- * Signals that an I/O exception has occurred.
- */
- private void setTimeRangeOnGet(Get get, long startTime, long endTime)
- throws IOException {
- boolean setTimeRange = true;
- if (startTime < 0 && endTime < 0) {
- setTimeRange = false;
- }
- if (setTimeRange) {
- if (startTime < 0) {
- startTime = 0;
- } else {
- startTime = PcapHelper.convertToDataCreationTimeUnit(startTime);
- }
- if (endTime < 0) {
- endTime = Long.MAX_VALUE;
- } else {
- endTime = PcapHelper.convertToDataCreationTimeUnit(endTime);
- }
- Assert.isTrue(startTime < endTime,
- "startTime value must be less than endTime value");
- get.setTimeRange(startTime, endTime);
- }
- }
-
- /**
- * Gets the min limit for appending tokens.
- *
- * @return the min limit for appending tokens
- */
- private String getMinLimitForAppendingTokens() {
- int digits = ConfigurationUtil.getAppendingTokenDigits();
- StringBuffer sbf = new StringBuffer();
- for (int i = 0; i < digits; i++) {
- sbf.append("0");
- }
- return sbf.toString();
- }
-
- /**
- * Gets the max limit for appending tokens.
- *
- * @return the max limit for appending tokens
- */
- private String getMaxLimitForAppendingTokens() {
- int digits = ConfigurationUtil.getAppendingTokenDigits();
- StringBuffer sbf = new StringBuffer();
- for (int i = 0; i < digits; i++) {
- sbf.append("9");
- }
- return sbf.toString();
- }
-
- /**
- * The main method.
- *
- * @param args
- * the arguments
- *
- * @throws IOException
- * Signals that an I/O exception has occurred.
- */
- public static void main(String[] args) throws IOException {
- if (args == null || args.length < 2) {
- usage();
- return;
- }
- String outputFileName = null;
- outputFileName = args[1];
- List<String> keys = Arrays.asList(StringUtils.split(args[2], ","));
- System.out.println("Geting keys " + keys);
- long startTime = 0;
- long endTime = Long.MAX_VALUE;
- if (args.length > 3) {
- startTime = Long.valueOf(args[3]);
- }
- if (args.length > 4) {
- endTime = Long.valueOf(args[4]);
- }
- System.out.println("With start time " + startTime + " and end time "
- + endTime);
- PcapGetterHBaseImpl downloader = new PcapGetterHBaseImpl();
- PcapsResponse pcaps = downloader.getPcaps(keys, null, startTime, endTime,
- false, false, 6);
- File file = new File(outputFileName);
- FileUtils.write(file, "", false);
- FileUtils.writeByteArrayToFile(file, pcaps.getPcaps(), true);
- }
-
- /**
- * Usage.
- */
- private static void usage() {
- System.out.println("java " + PcapGetterHBaseImpl.class.getName() // $codepro.audit.disable
- // debuggingCode
- + " <zk quorum> <output file> <start key> [stop key]");
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/28c250d1/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapHelper.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapHelper.java b/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapHelper.java
deleted file mode 100644
index 893d176..0000000
--- a/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapHelper.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.pcapservice;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.log4j.Logger;
-import org.mortbay.log.Log;
-import org.springframework.util.Assert;
-
-import com.google.common.annotations.VisibleForTesting;
-
-/**
- * utility class which holds methods related to time conversions, building
- * reverse keys.
- */
-public class PcapHelper {
-
- /** The Constant LOGGER. */
- private static final Logger LOGGER = Logger.getLogger(PcapHelper.class);
-
- /** The cell timestamp comparator. */
- private static CellTimestampComparator CELL_TIMESTAMP_COMPARATOR = new CellTimestampComparator();
-
- /**
- * The Enum TimeUnit.
- */
- public enum TimeUnit {
-
- /** The seconds. */
- SECONDS,
- /** The millis. */
- MILLIS,
- /** The micros. */
- MICROS,
- /** The unknown. */
- UNKNOWN
- };
-
- /**
- * Converts the given time to the 'hbase' data creation time unit.
- *
- * @param inputTime
- * the input time
- * @return the long
- */
- public static long convertToDataCreationTimeUnit(long inputTime) {
- if (inputTime <= 9999999999L) {
- return convertSecondsToDataCreationTimeUnit(inputTime); // input time unit
- // is in seconds
- } else if (inputTime <= 9999999999999L) {
- return convertMillisToDataCreationTimeUnit(inputTime); // input time unit
- // is in millis
- } else if (inputTime <= 9999999999999999L) {
- return convertMicrosToDataCreationTimeUnit(inputTime); // input time unit
- // it in micros
- }
- return inputTime; // input time unit is unknown
- }
-
- /**
- * Returns the 'hbase' data creation time unit by reading
- * 'hbase.table.data.time.unit' property in 'hbase-config' properties file; If
- * none is mentioned in properties file, returns <code>TimeUnit.UNKNOWN</code>
- *
- * @return TimeUnit
- */
- @VisibleForTesting
- public static TimeUnit getDataCreationTimeUnit() {
- String timeUnit = ConfigurationUtil.getConfiguration().getString(
- "hbase.table.data.time.unit");
- LOGGER.debug("hbase.table.data.time.unit=" + timeUnit.toString());
- if (StringUtils.isNotEmpty(timeUnit)) {
- return TimeUnit.valueOf(timeUnit);
- }
- return TimeUnit.UNKNOWN;
- }
-
- /**
- * Convert seconds to data creation time unit.
- *
- * @param inputTime
- * the input time
- * @return the long
- */
- @VisibleForTesting
- public static long convertSecondsToDataCreationTimeUnit(long inputTime) {
- System.out.println("convert Seconds To DataCreation TimeUnit");
- TimeUnit dataCreationTimeUnit = getDataCreationTimeUnit();
- if (TimeUnit.SECONDS == dataCreationTimeUnit) {
- return inputTime;
- } else if (TimeUnit.MILLIS == dataCreationTimeUnit) {
- return inputTime * 1000;
- } else if (TimeUnit.MICROS == dataCreationTimeUnit) {
- return inputTime * 1000 * 1000;
- }
- return inputTime;
- }
-
- /**
- * Builds the reverseKey to fetch the pcaps in the reverse traffic
- * (destination to source).
- *
- * @param key
- * indicates hbase rowKey (partial or full) in the format
- * "srcAddr-dstAddr-protocol-srcPort-dstPort-fragment"
- * @return String indicates the key in the format
- * "dstAddr-srcAddr-protocol-dstPort-srcPort"
- */
- public static String reverseKey(String key) {
- Assert.hasText(key, "key must not be null or empty");
- String delimeter = HBaseConfigConstants.PCAP_KEY_DELIMETER;
- String regex = "\\" + delimeter;
- StringBuffer sb = new StringBuffer();
- try {
- String[] tokens = key.split(regex);
- Assert
- .isTrue(
- (tokens.length == 5 || tokens.length == 6 || tokens.length == 7),
- "key is not in the format : 'srcAddr-dstAddr-protocol-srcPort-dstPort-{ipId-fragment identifier}'");
- sb.append(tokens[1]).append(delimeter).append(tokens[0])
- .append(delimeter).append(tokens[2]).append(delimeter)
- .append(tokens[4]).append(delimeter).append(tokens[3]);
- } catch (Exception e) {
- Log.warn("Failed to reverse the key. Reverse scan won't be performed.", e);
- }
- return sb.toString();
- }
-
- /**
- * Builds the reverseKeys to fetch the pcaps in the reverse traffic
- * (destination to source). If all keys in the input are not in the expected
- * format, it returns an empty list;
- *
- * @param keys
- * indicates list of hbase rowKeys (partial or full) in the format
- * "srcAddr-dstAddr-protocol-srcPort-dstPort-fragment"
- * @return List<String> indicates the list of keys in the format
- * "dstAddr-srcAddr-protocol-dstPort-srcPort"
- */
- public static List<String> reverseKey(List<String> keys) {
- Assert.notEmpty(keys, "'keys' must not be null or empty");
- List<String> reverseKeys = new ArrayList<String>();
- for (String key : keys) {
- if (key != null) {
- String reverseKey = reverseKey(key);
- if (StringUtils.isNotEmpty(reverseKey)) {
- reverseKeys.add(reverseKey);
- }
- }
- }
- return reverseKeys;
- }
-
- /**
- * Returns Comparator for sorting pcaps cells based on the timestamp (dsc).
- *
- * @return CellTimestampComparator
- */
- public static CellTimestampComparator getCellTimestampComparator() {
- return CELL_TIMESTAMP_COMPARATOR;
- }
-
- /**
- * Convert millis to data creation time unit.
- *
- * @param inputTime
- * the input time
- * @return the long
- */
- @VisibleForTesting
- private static long convertMillisToDataCreationTimeUnit(long inputTime) {
- System.out.println("convert Millis To DataCreation TimeUnit");
- TimeUnit dataCreationTimeUnit = getDataCreationTimeUnit();
- if (TimeUnit.SECONDS == dataCreationTimeUnit) {
- return (inputTime / 1000);
- } else if (TimeUnit.MILLIS == dataCreationTimeUnit) {
- return inputTime;
- } else if (TimeUnit.MICROS == dataCreationTimeUnit) {
- return inputTime * 1000;
- }
- return inputTime;
- }
-
- /**
- * Convert micros to data creation time unit.
- *
- * @param inputTime
- * the input time
- * @return the long
- */
- @VisibleForTesting
- private static long convertMicrosToDataCreationTimeUnit(long inputTime) {
- System.out.println("convert Micros To DataCreation TimeUnit");
- TimeUnit dataCreationTimeUnit = getDataCreationTimeUnit();
- if (TimeUnit.SECONDS == dataCreationTimeUnit) {
- return inputTime / (1000 * 1000);
- } else if (TimeUnit.MILLIS == dataCreationTimeUnit) {
- return inputTime / 1000;
- } else if (TimeUnit.MICROS == dataCreationTimeUnit) {
- return inputTime;
- }
- return inputTime;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/28c250d1/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java b/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java
index ce3cec9..1d0beb8 100644
--- a/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java
+++ b/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java
@@ -20,158 +20,83 @@ package org.apache.metron.pcapservice;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.EnumMap;
import java.util.List;
import javax.servlet.http.HttpServletResponse;
-import javax.ws.rs.DefaultValue;
-import javax.ws.rs.GET;
-import javax.ws.rs.Path;
-import javax.ws.rs.QueryParam;
+import javax.ws.rs.*;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
+import com.google.common.base.Joiner;
import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.Logger;
import com.google.common.annotations.VisibleForTesting;
-import org.apache.metron.pcap.utils.PcapUtils;
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.utils.timestamp.TimestampConverters;
+import org.apache.metron.pcap.mr.PcapJob;
@Path("/")
public class PcapReceiverImplRestEasy {
- /** The Constant LOGGER. */
- private static final Logger LOGGER = Logger
- .getLogger(PcapReceiverImplRestEasy.class);
-
- /** The Constant HEADER_CONTENT_DISPOSITION_NAME. */
- private static final String HEADER_CONTENT_DISPOSITION_NAME = "Content-Disposition";
-
- /** The Constant HEADER_CONTENT_DISPOSITION_VALUE. */
- private static final String HEADER_CONTENT_DISPOSITION_VALUE = "attachment; filename=\"managed-threat.pcap\"";
-
- /** partial response key header name. */
- private static final String HEADER_PARTIAL_RESPONE_KEY = "lastRowKey";
-
- @GET
- @Path("pcapGetter/getPcapsByKeys")
- public Response getPcapsByKeys(
- @QueryParam("keys") List<String> keys,
- @QueryParam("lastRowKey") String lastRowKey,
- @DefaultValue("-1") @QueryParam("startTime") long startTime,
- @DefaultValue("-1") @QueryParam("endTime") long endTime,
- @QueryParam("includeDuplicateLastRow") boolean includeDuplicateLastRow,
- @QueryParam("includeReverseTraffic") boolean includeReverseTraffic,
- @QueryParam("maxResponseSize") String maxResponseSize,
- @Context HttpServletResponse response) throws IOException {
- PcapsResponse pcapResponse = null;
-
- if (keys == null || keys.size() == 0)
- return Response.serverError().status(Response.Status.NO_CONTENT)
- .entity("'keys' must not be null or empty").build();
-
- try {
- IPcapGetter pcapGetter = PcapGetterHBaseImpl.getInstance();
- pcapResponse = pcapGetter.getPcaps(parseKeys(keys), lastRowKey,
- startTime, endTime, includeReverseTraffic,
- includeDuplicateLastRow,
- ConfigurationUtil.validateMaxResultSize(maxResponseSize));
- LOGGER.info("pcaps response in REST layer ="
- + pcapResponse.toString());
-
- // return http status '204 No Content' if the pcaps response size is
- // 0
- if (pcapResponse == null || pcapResponse.getResponseSize() == 0) {
-
- return Response.status(Response.Status.NO_CONTENT).build();
- }
-
- // return http status '206 Partial Content', the partial response
- // file and
- // 'lastRowKey' header , if the pcaps response status is 'PARTIAL'
-
- response.setHeader(HEADER_CONTENT_DISPOSITION_NAME,
- HEADER_CONTENT_DISPOSITION_VALUE);
-
- if (pcapResponse.getStatus() == PcapsResponse.Status.PARTIAL) {
-
- response.setHeader(HEADER_PARTIAL_RESPONE_KEY,
- pcapResponse.getLastRowKey());
-
- return Response
- .ok(pcapResponse.getPcaps(),
- MediaType.APPLICATION_OCTET_STREAM).status(206)
- .build();
-
- }
-
- } catch (IOException e) {
- LOGGER.error(
- "Exception occurred while fetching Pcaps for the keys :"
- + keys.toString(), e);
- throw e;
- }
-
- // return http status '200 OK' along with the complete pcaps response
- // file,
- // and headers
- // return new ResponseEntity<byte[]>(pcapResponse.getPcaps(), headers,
- // HttpStatus.OK);
-
- return Response
- .ok(pcapResponse.getPcaps(), MediaType.APPLICATION_OCTET_STREAM)
- .status(200).build();
-
- }
-
-
- @GET
- @Path("/pcapGetter/getPcapsByKeyRange")
-
- public Response getPcapsByKeyRange(
- @QueryParam("startKey") String startKey,
- @QueryParam("endKey")String endKey,
- @QueryParam("maxResponseSize") String maxResponseSize,
- @DefaultValue("-1") @QueryParam("startTime")long startTime,
- @DefaultValue("-1") @QueryParam("endTime") long endTime,
- @Context HttpServletResponse servlet_response) throws IOException {
-
- if (startKey == null || startKey.equals(""))
- return Response.serverError().status(Response.Status.NO_CONTENT)
- .entity("'start key' must not be null or empty").build();
-
- if (startKey == null || startKey.equals(""))
- return Response.serverError().status(Response.Status.NO_CONTENT)
- .entity("'end key' must not be null or empty").build();
-
-
- byte[] response = null;
- try {
- IPcapScanner pcapScanner = PcapScannerHBaseImpl.getInstance();
- response = pcapScanner.getPcaps(startKey, endKey,
- ConfigurationUtil.validateMaxResultSize(maxResponseSize), startTime,
- endTime);
- if (response == null || response.length == 0) {
-
- return Response.status(Response.Status.NO_CONTENT).build();
-
- }
- servlet_response.setHeader(HEADER_CONTENT_DISPOSITION_NAME,
- HEADER_CONTENT_DISPOSITION_VALUE);
-
- } catch (IOException e) {
- LOGGER.error(
- "Exception occurred while fetching Pcaps for the key range : startKey="
- + startKey + ", endKey=" + endKey, e);
- throw e;
- }
- // return http status '200 OK' along with the complete pcaps response file,
- // and headers
-
- return Response
- .ok(response, MediaType.APPLICATION_OCTET_STREAM)
- .status(200).build();
- }
+ /** The Constant LOGGER. */
+ private static final Logger LOGGER = Logger
+ .getLogger(PcapReceiverImplRestEasy.class);
+
+ /** The Constant HEADER_CONTENT_DISPOSITION_NAME. */
+ private static final String HEADER_CONTENT_DISPOSITION_NAME = "Content-Disposition";
+
+ /** The Constant HEADER_CONTENT_DISPOSITION_VALUE. */
+ private static final String HEADER_CONTENT_DISPOSITION_VALUE = "attachment; filename=\"managed-threat.pcap\"";
+
+ /** partial response key header name. */
+ private static final String HEADER_PARTIAL_RESPONE_KEY = "lastRowKey";
+ private static ThreadLocal<Configuration> CONFIGURATION = new ThreadLocal<Configuration>() {
+ /**
+ * Returns the current thread's "initial value" for this
+ * thread-local variable. This method will be invoked the first
+ * time a thread accesses the variable with the {@link #get}
+ * method, unless the thread previously invoked the {@link #set}
+ * method, in which case the {@code initialValue} method will not
+ * be invoked for the thread. Normally, this method is invoked at
+ * most once per thread, but it may be invoked again in case of
+ * subsequent invocations of {@link #remove} followed by {@link #get}.
+ * <p>
+ * <p>This implementation simply returns {@code null}; if the
+ * programmer desires thread-local variables to have an initial
+ * value other than {@code null}, {@code ThreadLocal} must be
+ * subclassed, and this method overridden. Typically, an
+ * anonymous inner class will be used.
+ *
+ * @return the initial value for this thread-local
+ */
+ @Override
+ protected Configuration initialValue() {
+ return new Configuration();
+ }
+ };
+ PcapJob queryUtil = new PcapJob();
+
+ protected PcapJob getQueryUtil() {
+ return queryUtil;
+ }
+
+ private static boolean isValidPort(String port) {
+ if( port != null && !port.equals("") ) {
+ try {
+ Integer.parseInt(port);
+ return true;
+ }
+ catch(Exception e) {
+ return false;
+ }
+ }
+ return false;
+ }
/*
* (non-Javadoc)
@@ -182,86 +107,104 @@ public class PcapReceiverImplRestEasy {
* java.lang.String, long, long, boolean,
* javax.servlet.http.HttpServletResponse)
*/
-
- @GET
- @Path("/pcapGetter/getPcapsByIdentifiers")
-
- public Response getPcapsByIdentifiers(
- @QueryParam ("srcIp") String srcIp,
- @QueryParam ("dstIp") String dstIp,
- @QueryParam ("protocol") String protocol,
- @QueryParam ("srcPort") String srcPort,
- @QueryParam ("dstPort") String dstPort,
- @DefaultValue("-1") @QueryParam ("startTime")long startTime,
- @DefaultValue("-1") @QueryParam ("endTime")long endTime,
- @DefaultValue("false") @QueryParam ("includeReverseTraffic") boolean includeReverseTraffic,
- @Context HttpServletResponse servlet_response)
-
- throws IOException {
-
- if (srcIp == null || srcIp.equals(""))
- return Response.serverError().status(Response.Status.NO_CONTENT)
- .entity("'srcIp' must not be null or empty").build();
-
- if (dstIp == null || dstIp.equals(""))
- return Response.serverError().status(Response.Status.NO_CONTENT)
- .entity("'dstIp' must not be null or empty").build();
-
- if (protocol == null || protocol.equals(""))
- return Response.serverError().status(Response.Status.NO_CONTENT)
- .entity("'protocol' must not be null or empty").build();
-
- if (srcPort == null || srcPort.equals(""))
- return Response.serverError().status(Response.Status.NO_CONTENT)
- .entity("'srcPort' must not be null or empty").build();
-
- if (dstPort == null || dstPort.equals(""))
- return Response.serverError().status(Response.Status.NO_CONTENT)
- .entity("'dstPort' must not be null or empty").build();
-
-
- PcapsResponse response = null;
- try {
- String sessionKey = PcapUtils.getPartialSessionKey(srcIp, dstIp, protocol,
- srcPort, dstPort);
- LOGGER.info("sessionKey =" + sessionKey);
- IPcapGetter pcapGetter = PcapGetterHBaseImpl.getInstance();
- response = pcapGetter.getPcaps(Arrays.asList(sessionKey), null,
- startTime, endTime, includeReverseTraffic, false,
- ConfigurationUtil.getDefaultResultSize());
- if (response == null || response.getResponseSize() == 0) {
- return Response.status(Response.Status.NO_CONTENT).build();
- }
- servlet_response.setHeader(HEADER_CONTENT_DISPOSITION_NAME,
- HEADER_CONTENT_DISPOSITION_VALUE);
-
- } catch (IOException e) {
- LOGGER.error("Exception occurred while fetching Pcaps by identifiers :",
- e);
- throw e;
- }
- // return http status '200 OK' along with the complete pcaps response file,
- // and headers
- return Response
- .ok(response.getPcaps(), MediaType.APPLICATION_OCTET_STREAM)
- .status(200).build();
- }
- /**
- * This method parses the each value in the List using delimiter ',' and
- * builds a new List;.
- *
- * @param keys
- * list of keys to be parsed
- * @return list of keys
- */
- @VisibleForTesting
- List<String> parseKeys(List<String> keys) {
- // Assert.notEmpty(keys);
- List<String> parsedKeys = new ArrayList<String>();
- for (String key : keys) {
- parsedKeys.addAll(Arrays.asList(StringUtils.split(
- StringUtils.trim(key), ",")));
- }
- return parsedKeys;
- }
+ @GET
+ @Path("/pcapGetter/getPcapsByIdentifiers")
+
+ public Response getPcapsByIdentifiers(
+ @QueryParam ("srcIp") String srcIp,
+ @QueryParam ("dstIp") String dstIp,
+ @QueryParam ("protocol") String protocol,
+ @QueryParam ("srcPort") String srcPort,
+ @QueryParam ("dstPort") String dstPort,
+ @DefaultValue("-1") @QueryParam ("startTime")long startTime,
+ @DefaultValue("-1") @QueryParam ("endTime")long endTime,
+ @DefaultValue("false") @QueryParam ("includeReverseTraffic") boolean includeReverseTraffic,
+ @Context HttpServletResponse servlet_response)
+
+ throws IOException {
+
+ if (!isValidPort(srcPort)) {
+ return Response.serverError().status(Response.Status.NO_CONTENT)
+ .entity("'srcPort' must not be null, empty or a non-integer").build();
+ }
+
+ if (!isValidPort(dstPort)) {
+ return Response.serverError().status(Response.Status.NO_CONTENT)
+ .entity("'dstPort' must not be null, empty or a non-integer").build();
+ }
+
+ final boolean includeReverseTrafficF = includeReverseTraffic;
+ PcapsResponse response = new PcapsResponse();
+ try {
+ if(startTime < 0) {
+ startTime = 0L;
+ }
+ if(endTime < 0) {
+ endTime = System.currentTimeMillis();
+ }
+
+ //convert to nanoseconds since the epoch
+ startTime = TimestampConverters.MILLISECONDS.toNanoseconds(startTime);
+ endTime = TimestampConverters.MILLISECONDS.toNanoseconds(endTime);
+ EnumMap<Constants.Fields, String> query = new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
+ if(srcIp != null) {
+ put(Constants.Fields.SRC_ADDR, srcIp);
+ }
+ if(dstIp != null) {
+ put(Constants.Fields.DST_ADDR, dstIp);
+ }
+ if(srcPort != null) {
+ put(Constants.Fields.SRC_PORT, srcPort);
+ }
+ if(dstPort != null) {
+ put(Constants.Fields.DST_PORT, dstPort);
+ }
+ if(protocol != null) {
+ put(Constants.Fields.PROTOCOL, protocol);
+ }
+ put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "" + includeReverseTrafficF);
+ }};
+ if(LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Query received: " + Joiner.on(",").join(query.entrySet()));
+ }
+ response.setPcaps(getQueryUtil().query(new org.apache.hadoop.fs.Path(ConfigurationUtil.getPcapOutputPath())
+ , new org.apache.hadoop.fs.Path(ConfigurationUtil.getTempQueryOutputPath())
+ , startTime
+ , endTime
+ , query
+ , CONFIGURATION.get()
+ , FileSystem.get(CONFIGURATION.get())
+ )
+ );
+
+ } catch (Exception e) {
+ LOGGER.error("Exception occurred while fetching Pcaps by identifiers :",
+ e);
+ throw new WebApplicationException("Unable to fetch Pcaps via MR job", e);
+ }
+
+ // return http status '200 OK' along with the complete pcaps response file,
+ // and headers
+ return Response
+ .ok(response.getPcaps(), MediaType.APPLICATION_OCTET_STREAM)
+ .status(200).build();
+ }
+ /**
+ * This method parses the each value in the List using delimiter ',' and
+ * builds a new List;.
+ *
+ * @param keys
+ * list of keys to be parsed
+ * @return list of keys
+ */
+ @VisibleForTesting
+ List<String> parseKeys(List<String> keys) {
+ // Assert.notEmpty(keys);
+ List<String> parsedKeys = new ArrayList<String>();
+ for (String key : keys) {
+ parsedKeys.addAll(Arrays.asList(StringUtils.split(
+ StringUtils.trim(key), ",")));
+ }
+ return parsedKeys;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/28c250d1/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapScannerHBaseImpl.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapScannerHBaseImpl.java b/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapScannerHBaseImpl.java
deleted file mode 100644
index f163408..0000000
--- a/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapScannerHBaseImpl.java
+++ /dev/null
@@ -1,319 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.pcapservice;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.MasterNotRunningException;
-import org.apache.hadoop.hbase.ZooKeeperConnectionException;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.NoServerForRegionException;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.log4j.Logger;
-import org.springframework.util.Assert;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.metron.pcap.PcapMerger;
-
-/**
- * Singleton class which integrates with HBase table and returns sorted pcaps
- * based on the timestamp for the given range of keys. Creates HConnection if it
- * is not already created and the same connection instance is being used for all
- * requests
- *
- * @author sheetal
- * @version $Revision: 1.0 $
- */
-public class PcapScannerHBaseImpl implements IPcapScanner {
-
- /** The Constant LOGGER. */
- private static final Logger LOGGER = Logger
- .getLogger(PcapScannerHBaseImpl.class);
-
- /** The Constant DEFAULT_HCONNECTION_RETRY_LIMIT. */
- private static final int DEFAULT_HCONNECTION_RETRY_LIMIT = 0;
-
- /** The pcap scanner h base. */
- private static IPcapScanner pcapScannerHBase = null;
-
- /*
- * (non-Javadoc)
- *
- * @see com.cisco.opensoc.hbase.client.IPcapScanner#getPcaps(java.lang.String,
- * java.lang.String, long, long, long)
- */
-
- public byte[] getPcaps(String startKey, String endKey, long maxResultSize,
- long startTime, long endTime) throws IOException {
- Assert.hasText(startKey, "startKey must no be null or empty");
- byte[] cf = Bytes.toBytes(ConfigurationUtil.getConfiguration()
- .getString("hbase.table.column.family"));
- byte[] cq = Bytes.toBytes(ConfigurationUtil.getConfiguration()
- .getString("hbase.table.column.qualifier"));
- // create scan request
- Scan scan = createScanRequest(cf, cq, startKey, endKey, maxResultSize,
- startTime, endTime);
- List<byte[]> pcaps = new ArrayList<byte[]>();
- HTable table = null;
- try {
- pcaps = scanPcaps(pcaps, table, scan, cf, cq);
- } catch (IOException e) {
- LOGGER.error(
- "Exception occurred while fetching Pcaps for the key range : startKey="
- + startKey + ", endKey=" + endKey, e);
- if (e instanceof ZooKeeperConnectionException
- || e instanceof MasterNotRunningException
- || e instanceof NoServerForRegionException) {
- int maxRetryLimit = getConnectionRetryLimit();
- for (int attempt = 1; attempt <= maxRetryLimit; attempt++) {
- try {
- HBaseConfigurationUtil.closeConnection(); // closing the existing
- // connection and retry,
- // it will create a new
- // HConnection
- pcaps = scanPcaps(pcaps, table, scan, cf, cq);
- break;
- } catch (IOException ie) {
- if (attempt == maxRetryLimit) {
- System.out.println("Throwing the exception after retrying "
- + maxRetryLimit + " times.");
- throw e;
- }
- }
- }
- } else {
- throw e;
- }
- } finally {
- if (table != null) {
- table.close();
- }
- }
- if (pcaps.size() == 1) {
- return pcaps.get(0);
- }
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- PcapMerger.merge(baos, pcaps);
- byte[] response = baos.toByteArray();
- return response;
- }
-
- /**
- * Creates the scan request.
- *
- * @param cf
- * the cf
- * @param cq
- * the cq
- * @param startKey
- * the start key
- * @param endKey
- * the end key
- * @param maxResultSize
- * the max result size
- * @param startTime
- * the start time
- * @param endTime
- * the end time
- * @return the scan
- * @throws IOException
- * Signals that an I/O exception has occurred.
- */
- @VisibleForTesting
- Scan createScanRequest(byte[] cf, byte[] cq, String startKey, String endKey,
- long maxResultSize, long startTime, long endTime) throws IOException {
- Scan scan = new Scan();
- scan.addColumn(cf, cq);
- scan.setMaxVersions(ConfigurationUtil.getConfiguration().getInt(
- "hbase.table.column.maxVersions"));
- scan.setStartRow(startKey.getBytes());
- if (endKey != null) {
- scan.setStopRow(endKey.getBytes());
- }
- scan.setMaxResultSize(maxResultSize);
- boolean setTimeRange = true;
- if (startTime < 0 && endTime < 0) {
- setTimeRange = false;
- }
- if (setTimeRange) {
- if (startTime < 0) {
- startTime = 0;
- } else {
- startTime = PcapHelper.convertToDataCreationTimeUnit(startTime);
- }
- if (endTime < 0) {
- endTime = Long.MAX_VALUE;
- } else {
- endTime = PcapHelper.convertToDataCreationTimeUnit(endTime);
- }
- Assert.isTrue(startTime < endTime,
- "startTime value must be less than endTime value");
- }
- // create Scan request;
- if (setTimeRange) {
- scan.setTimeRange(startTime, endTime);
- }
- return scan;
- }
-
- /**
- * Scan pcaps.
- *
- * @param pcaps
- * the pcaps
- * @param table
- * the table
- * @param scan
- * the scan
- * @param cf
- * the cf
- * @param cq
- * the cq
- * @return the list
- * @throws IOException
- * Signals that an I/O exception has occurred.
- */
- @VisibleForTesting
- List<byte[]> scanPcaps(List<byte[]> pcaps, HTable table, Scan scan,
- byte[] cf, byte[] cq) throws IOException {
- LOGGER.info("Scan =" + scan.toString());
- table = (HTable) HBaseConfigurationUtil.getConnection().getTable(
- ConfigurationUtil.getConfiguration().getString("hbase.table.name"));
- ResultScanner resultScanner = table.getScanner(scan);
- List<Cell> scannedCells = new ArrayList<Cell>();
- for (Result result = resultScanner.next(); result != null; result = resultScanner
- .next()) {
- List<Cell> cells = result.getColumnCells(cf, cq);
- if (cells != null) {
- for (Cell cell : cells) {
- scannedCells.add(cell);
- }
- }
- }
- Collections.sort(scannedCells, PcapHelper.getCellTimestampComparator());
- LOGGER.info("sorted cells :" + scannedCells.toString());
- for (Cell sortedCell : scannedCells) {
- pcaps.add(CellUtil.cloneValue(sortedCell));
- }
- return pcaps;
- }
-
- /**
- * Gets the connection retry limit.
- *
- * @return the connection retry limit
- */
- private int getConnectionRetryLimit() {
- return ConfigurationUtil.getConfiguration().getInt(
- "hbase.hconnection.retries.number", DEFAULT_HCONNECTION_RETRY_LIMIT);
- }
-
- /*
- * (non-Javadoc)
- *
- * @see com.cisco.opensoc.hbase.client.IPcapScanner#getPcaps(java.lang.String,
- * java.lang.String)
- */
-
- public byte[] getPcaps(String startKey, String endKey) throws IOException {
- Assert.hasText(startKey, "startKey must no be null or empty");
- Assert.hasText(endKey, "endKey must no be null or empty");
- return getPcaps(startKey, endKey, ConfigurationUtil.getDefaultResultSize(),
- -1, -1);
- }
-
- /**
- * Always returns the singleton instance.
- *
- * @return IPcapScanner singleton instance
- * @throws IOException
- * Signals that an I/O exception has occurred.
- */
- public static IPcapScanner getInstance() throws IOException {
- if (pcapScannerHBase == null) {
- synchronized (PcapScannerHBaseImpl.class) {
- if (pcapScannerHBase == null) {
- pcapScannerHBase = new PcapScannerHBaseImpl();
- }
- }
- }
- return pcapScannerHBase;
- }
-
- /**
- * Instantiates a new pcap scanner h base impl.
- */
- private PcapScannerHBaseImpl() {
- }
-
- /**
- * The main method.
- */
- // public static void main(String[] args) throws IOException {
- // if (args == null || args.length < 3) {
- // usage();
- // return;
- // }
- // String outputFileName = null;
- // String startKey = null;
- // String stopKey = null;
- // outputFileName = args[0];
- // startKey = args[1];
- // if (args.length > 2) { // NOPMD by sheetal on 1/29/14 3:55 PM
- // stopKey = args[2];
- // }
- // PcapScannerHBaseImpl downloader = new PcapScannerHBaseImpl();
- // byte[] pcaps = downloader.getPcaps(startKey, stopKey, defaultResultSize, 0,
- // Long.MAX_VALUE);
- // File file = new File(outputFileName);
- // FileUtils.write(file, "", false);
- // ByteArrayOutputStream baos = new ByteArrayOutputStream(); //
- // $codepro.audit.disable
- // // closeWhereCreated
- // PcapMerger.merge(baos, pcaps);
- // FileUtils.writeByteArrayToFile(file, baos.toByteArray(), true);
- // }
-
- /**
- * Usage.
- */
- @SuppressWarnings("unused")
- private static void usage() {
- System.out.println("java " + PcapScannerHBaseImpl.class.getName() // NOPMD
- // by
- // sheetal
- // <!-- //
- // $codepro.audit.disable
- // debuggingCode
- // -->
- // on
- // 1/29/14
- // 3:55
- // PM
- + " <zk quorum> <output file> <start key> [stop key]");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/28c250d1/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapsResponse.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapsResponse.java b/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapsResponse.java
index b2fada1..a5f825d 100644
--- a/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapsResponse.java
+++ b/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapsResponse.java
@@ -73,58 +73,6 @@ public class PcapsResponse {
this.pcaps.add(pcaps);
}
- /**
- * Gets the partial response key.
- *
- * @return the partial response key
- */
- public String getLastRowKey() {
- return lastRowKey;
- }
-
- /**
- * Sets the partial response key.
- *
- * @param lastRowKey
- * the last row key
- */
- public void setLastRowKey(String lastRowKey) {
- this.lastRowKey = lastRowKey;
- }
-
- /**
- * Gets the status.
- *
- * @return the status
- */
- public Status getStatus() {
- return status;
- }
-
- /**
- * Sets the status.
- *
- * @param status
- * the new status
- */
- public void setStatus(Status status) {
- this.status = status;
- }
-
- /**
- * Checks if is resonse size within limit.
- *
- * @param maxResultSize
- * the max result size
- * @return true, if is resonse size within limit
- */
- public boolean isResonseSizeWithinLimit(long maxResultSize) {
- // System.out.println("isResonseSizeWithinLimit() : getResponseSize() < (input|default result size - maximum packet size ) ="+
- // getResponseSize()+ " < " + ( maxResultSize
- // -ConfigurationUtil.getMaxRowSize()));
- return getResponseSize() < (maxResultSize - ConfigurationUtil
- .getMaxRowSize());
- }
/**
* Gets the response size.
@@ -147,6 +95,9 @@ public class PcapsResponse {
* Signals that an I/O exception has occurred.
*/
public byte[] getPcaps() throws IOException {
+ if(pcaps == null) {
+ return new byte[] {};
+ }
if (pcaps.size() == 1) {
return pcaps.get(0);
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/28c250d1/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/rest/JettyServiceRunner.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/rest/JettyServiceRunner.java b/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/rest/JettyServiceRunner.java
index f4fb27c..2a930b8 100644
--- a/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/rest/JettyServiceRunner.java
+++ b/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/rest/JettyServiceRunner.java
@@ -17,12 +17,13 @@
*/
package org.apache.metron.pcapservice.rest;
+import org.apache.metron.pcapservice.PcapReceiverImplRestEasy;
+
import java.util.HashSet;
import java.util.Set;
import javax.ws.rs.core.Application;
-import org.apache.metron.pcapservice.PcapReceiverImplRestEasy;
public class JettyServiceRunner extends Application {
@@ -31,7 +32,7 @@ public class JettyServiceRunner extends Application {
public JettyServiceRunner() {
// initialize restful services
- services.add(new PcapReceiverImplRestEasy());
+ services.add(new PcapReceiverImplRestEasy());
}
@Override
public Set getSingletons() {
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/28c250d1/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/rest/PcapService.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/rest/PcapService.java b/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/rest/PcapService.java
index b5832b6..6773acb 100644
--- a/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/rest/PcapService.java
+++ b/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/rest/PcapService.java
@@ -19,33 +19,39 @@ package org.apache.metron.pcapservice.rest;
import java.io.IOException;
+import org.apache.log4j.Logger;
+import org.apache.metron.api.helper.service.PcapServiceCli;
+import org.apache.metron.pcapservice.ConfigurationUtil;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.jboss.resteasy.plugins.server.servlet.HttpServletDispatcher;
-import org.apache.metron.api.helper.service.PcapServiceCli;
public class PcapService {
- public static void main(String[] args) throws IOException {
+ private static final Logger LOG = Logger.getLogger(PcapService.class);
+ public static void main(String[] args) throws IOException {
- PcapServiceCli cli = new PcapServiceCli(args);
- cli.parse();
-
- Server server = new Server(cli.getPort());
- ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
- context.setContextPath("/");
- ServletHolder h = new ServletHolder(new HttpServletDispatcher());
- h.setInitParameter("javax.ws.rs.Application", "org.apache.metron.pcapservice.rest.JettyServiceRunner");
- context.addServlet(h, "/*");
- server.setHandler(context);
- try {
- server.start();
- server.join();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
+ PcapServiceCli cli = new PcapServiceCli(args);
+ cli.parse();
+ ConfigurationUtil.setPcapOutputPath(cli.getPcapHdfsPath());
+ LOG.info("Pcap location set to " + cli.getPcapHdfsPath());
+ ConfigurationUtil.setTempQueryOutputPath(cli.getQueryHdfsPath());
+ LOG.info("Query temp location set to " + cli.getQueryHdfsPath());
+ Server server = new Server(cli.getPort());
+ ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
+ context.setContextPath("/");
+ ServletHolder h = new ServletHolder(new HttpServletDispatcher());
+ h.setInitParameter("javax.ws.rs.Application", "org.apache.metron.pcapservice.rest.JettyServiceRunner");
+ context.addServlet(h, "/*");
+ server.setHandler(context);
+ try {
+ server.start();
+ server.join();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/28c250d1/metron-platform/metron-api/src/main/resources/config-definition-hbase.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-api/src/main/resources/config-definition-hbase.xml b/metron-platform/metron-api/src/main/resources/config-definition-hbase.xml
deleted file mode 100644
index 98ece42..0000000
--- a/metron-platform/metron-api/src/main/resources/config-definition-hbase.xml
+++ /dev/null
@@ -1,50 +0,0 @@
-<?xml version="1.0" encoding="ISO-8859-1" ?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
--->
-<configuration>
- <header>
- <result delimiterParsingDisabled="true" forceReloadCheck="true"></result>
- <lookups>
- <lookup config-prefix="expr"
- config-class="org.apache.commons.configuration.interpol.ExprLookup">
- <variables>
- <variable name="System" value="Class:java.lang.System"/>
- <variable name="net" value="Class:java.net.InetAddress"/>
- <variable name="String" value="Class:org.apache.commons.lang.StringUtils"/>
- </variables>
- </lookup>
- </lookups>
- </header>
- <override>
- <!-- 1. properties from 'hbae-config.properties' are loaded first;
- if a property is not present in this file, then it will search in the files in the order they are defined here.
- 2. 'refreshDelay' indicates the minimum delay in milliseconds between checks to see if the underlying file is changed.
- 3. 'config-optional' indicates this file is not required -->
-
- <properties fileName="${expr:System.getProperty('configPath')+'/hbase-config.properties'}" config-optional="true">
- <reloadingStrategy refreshDelay="${expr:System.getProperty('configRefreshDelay')}"
- config-class="org.apache.commons.configuration.reloading.FileChangedReloadingStrategy"/>
- </properties>
-
- <properties fileName="hbase-config-default.properties" config-optional="true">
-<!-- <reloadingStrategy refreshDelay="${expr:System.getProperty('defaultConfigRefreshDelay')}"
- config-class="org.apache.commons.configuration.reloading.FileChangedReloadingStrategy"/>
- --> </properties>
-
- </override>
-</configuration>
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/28c250d1/metron-platform/metron-api/src/main/resources/hbase-config-default.properties
----------------------------------------------------------------------
diff --git a/metron-platform/metron-api/src/main/resources/hbase-config-default.properties b/metron-platform/metron-api/src/main/resources/hbase-config-default.properties
deleted file mode 100644
index 0f47193..0000000
--- a/metron-platform/metron-api/src/main/resources/hbase-config-default.properties
+++ /dev/null
@@ -1,57 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-#hbase zoo keeper configuration
-hbase.zookeeper.quorum=zkpr1,zkpr2,zkpr3
-hbase.zookeeper.clientPort=2181
-hbase.client.retries.number=1
-zookeeper.session.timeout=60000
-zookeeper.recovery.retry=0
-
-#hbase table configuration
-hbase.table.name=pcap
-hbase.table.column.family=t
-hbase.table.column.qualifier=value
-hbase.table.column.maxVersions=5
-
-# scan size limit configuration in MB or KB; if the input is negative or greater than max value throw an error.
-hbase.scan.result.size.unit=MB
-hbase.scan.default.result.size=6
-hbase.scan.max.result.size=60
-
-# time stamp conversion configuration; possible values 'SECONDS'(seconds), 'MILLIS'(milli seconds), 'MICROS' (micro seconds)
-hbase.table.data.time.unit=MILLIS
-
-#number of retries in case of ZooKeeper or HBase server down
-hbase.hconnection.retries.number=3
-
-#configuration for including pcaps in the reverse traffic
-pcaps.include.reverse.traffic = false
-
-#maximum table row size in KB or MB
-hbase.table.row.size.unit = KB
-hbase.table.max.row.size = 70
-
-# tokens of row key configuration
-hbase.table.row.key.tokens=7
-rest.api.input.key.min.tokens=5
-
-# whether or not to include the last row from the previous request, applicable for only partial response scenario
-hbase.table.scan.include.duplicate.lastrow= true;
-
-#number of digits for appending tokens of the row key
-hbase.table.row.key.token.appending.digits=5
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/28c250d1/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/CellTimestampComparatorTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/CellTimestampComparatorTest.java b/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/CellTimestampComparatorTest.java
deleted file mode 100644
index 9d10c92..0000000
--- a/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/CellTimestampComparatorTest.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.pcapservice;
-
-import junit.framework.Assert;
-
-import org.apache.hadoop.hbase.Cell;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-import org.apache.metron.pcapservice.CellTimestampComparator;
-
-/**
- * The Class CellTimestampComparatorTest.
- */
-public class CellTimestampComparatorTest {
-
- /**
- * Sets the up.
- *
- * @throws Exception
- * the exception
- */
- @Before
- public void setUp() throws Exception {
- }
-
- /**
- * Tear down.
- *
- * @throws Exception
- * the exception
- */
- @After
- public void tearDown() throws Exception {
- }
-
- /**
- * Test_less.
- */
- @Test
- public void test_less() {
- // mocking
- Cell cell1 = Mockito.mock(Cell.class);
- Mockito.when(cell1.getTimestamp()).thenReturn(13945345808L);
- Cell cell2 = Mockito.mock(Cell.class);
- Mockito.when(cell2.getTimestamp()).thenReturn(13845345808L);
-
- CellTimestampComparator comparator = new CellTimestampComparator();
-
- // actual call and verify
- Assert.assertTrue(comparator.compare(cell2, cell1) == -1);
-
- }
-
- /**
- * Test_greater.
- */
- @Test
- public void test_greater() {
- // mocking
- Cell cell1 = Mockito.mock(Cell.class);
- Mockito.when(cell1.getTimestamp()).thenReturn(13745345808L);
- Cell cell2 = Mockito.mock(Cell.class);
- Mockito.when(cell2.getTimestamp()).thenReturn(13945345808L);
-
- CellTimestampComparator comparator = new CellTimestampComparator();
-
- // actual call and verify
- Assert.assertTrue(comparator.compare(cell2, cell1) == 1);
-
- }
-
- /**
- * Test_equal.
- */
- @Test
- public void test_equal() {
- // mocking
- Cell cell1 = Mockito.mock(Cell.class);
- Mockito.when(cell1.getTimestamp()).thenReturn(13945345808L);
- Cell cell2 = Mockito.mock(Cell.class);
- Mockito.when(cell2.getTimestamp()).thenReturn(13945345808L);
-
- CellTimestampComparator comparator = new CellTimestampComparator();
-
- // actual call and verify
- Assert.assertTrue(comparator.compare(cell2, cell1) == 0);
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/28c250d1/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/ConfigurationUtilTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/ConfigurationUtilTest.java b/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/ConfigurationUtilTest.java
index 3424425..3c0a77b 100644
--- a/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/ConfigurationUtilTest.java
+++ b/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/ConfigurationUtilTest.java
@@ -17,51 +17,29 @@
*/
package org.apache.metron.pcapservice;
+import org.junit.Assert;
import org.junit.Test;
-import org.apache.metron.pcapservice.ConfigurationUtil;
-import org.apache.metron.pcapservice.ConfigurationUtil.SizeUnit;
-import org.springframework.util.Assert;
-/**
- * The Class ConfigurationUtilTest.
- */
-public class ConfigurationUtilTest {
- /**
- * Test_get max allowable result size in bytes.
- */
- @Test
- public void test_getMaxAllowableResultSizeInBytes() {
- long result = ConfigurationUtil.getMaxResultSize();
- Assert.isTrue(result == 62914560);
- }
+public class ConfigurationUtilTest {
- /**
- * Test_get max allowable results size unit.
- */
- @Test
- public void test_getMaxAllowableResultsSizeUnit() {
- SizeUnit result = ConfigurationUtil.getResultSizeUnit();
- Assert.isTrue(SizeUnit.MB == result);
- }
- /**
- * Test_get max row size in bytes.
- */
@Test
- public void test_getMaxRowSizeInBytes() {
- long result = ConfigurationUtil.getMaxRowSize();
- Assert.isTrue(result == 71680);
+ public void test_getPcapOutputPath() {
+ Assert.assertEquals(ConfigurationUtil.getPcapOutputPath(), null);
+ ConfigurationUtil.setPcapOutputPath("/foo");
+ Assert.assertEquals(ConfigurationUtil.getPcapOutputPath(), "/foo");
}
/**
- * Test_get max row size unit.
+ * Test_get max allowable results size unit.
*/
@Test
- public void test_getMaxRowSizeUnit() {
- SizeUnit result = ConfigurationUtil.getRowSizeUnit();
- Assert.isTrue(SizeUnit.KB == result);
+ public void test_getTempQueryDir() {
+ Assert.assertEquals(ConfigurationUtil.getTempQueryOutputPath(), null);
+ ConfigurationUtil.setTempQueryOutputPath("/tmp");
+ Assert.assertEquals(ConfigurationUtil.getTempQueryOutputPath(), "/tmp");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/28c250d1/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/HBaseConfigurationUtilTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/HBaseConfigurationUtilTest.java b/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/HBaseConfigurationUtilTest.java
deleted file mode 100644
index 75ac782..0000000
--- a/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/HBaseConfigurationUtilTest.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.pcapservice;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.springframework.util.Assert;
-
-import org.apache.metron.pcapservice.HBaseConfigurationUtil;
-
-/**
- * The Class HBaseConfigurationUtilTest.
- */
-public class HBaseConfigurationUtilTest {
-
- /**
- * Sets the up.
- *
- * @throws Exception
- * the exception
- */
- @Before
- public void setUp() throws Exception {
- }
-
- /**
- * Tear down.
- *
- * @throws Exception
- * the exception
- */
- @After
- public void tearDown() throws Exception {
- }
-
- /**
- * Test_read.
- *
- * @throws IOException
- * Signals that an I/O exception has occurred.
- */
- @Test
- public void test_read() throws IOException {
- Configuration configuration = HBaseConfigurationUtil.read();
- Assert.isTrue(configuration != null, "Configuration must not be null");
- Assert.isTrue(configuration.get("hbase.client.retries.number").equals("1"),
- "value must be equal");
- }
-
-}