You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ni...@apache.org on 2016/09/16 16:40:20 UTC
incubator-metron git commit: METRON-419 Update Tuple to HBase
Mapper/Bolt to Set TTL (nickwallen) closes apache/incubator-metron#252
Repository: incubator-metron
Updated Branches:
refs/heads/master c64e8ad03 -> 3a2ecc404
METRON-419 Update Tuple to HBase Mapper/Bolt to Set TTL (nickwallen) closes apache/incubator-metron#252
Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/3a2ecc40
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/3a2ecc40
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/3a2ecc40
Branch: refs/heads/master
Commit: 3a2ecc404bdd8e6047dbe7199dee45192aa94428
Parents: c64e8ad
Author: nickwallen <ni...@nickallen.org>
Authored: Fri Sep 16 12:39:16 2016 -0400
Committer: Nick Allen <ni...@nickallen.org>
Committed: Fri Sep 16 12:39:16 2016 -0400
----------------------------------------------------------------------
.../metron/profiler/client/ProfileWriter.java | 3 +-
.../profiler/bolt/ProfileHBaseMapper.java | 23 ++-
.../metron/profiler/hbase/ColumnBuilder.java | 2 +-
.../profiler/hbase/ValueOnlyColumnBuilder.java | 2 +-
metron-platform/metron-hbase/pom.xml | 16 --
.../org/apache/metron/hbase/bolt/HBaseBolt.java | 26 +--
.../metron/hbase/bolt/mapper/ColumnList.java | 178 +++++++++++++++++++
.../metron/hbase/bolt/mapper/HBaseMapper.java | 56 ++++++
.../bolt/mapper/HBaseProjectionCriteria.java | 92 ++++++++++
.../metron/hbase/bolt/mapper/IColumn.java | 35 ++++
.../metron/hbase/bolt/mapper/ICounter.java | 34 ++++
.../apache/metron/hbase/client/HBaseClient.java | 6 +-
.../org/apache/metron/hbase/WidgetMapper.java | 22 ++-
.../apache/metron/hbase/bolt/HBaseBoltTest.java | 45 +++--
.../metron/hbase/client/HBaseClientTest.java | 6 +-
15 files changed, 492 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3a2ecc40/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/ProfileWriter.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/ProfileWriter.java b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/ProfileWriter.java
index eb88b5a..0ad2f44 100644
--- a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/ProfileWriter.java
+++ b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/ProfileWriter.java
@@ -22,13 +22,12 @@ package org.apache.metron.profiler.client;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.metron.hbase.bolt.mapper.ColumnList;
import org.apache.metron.hbase.client.HBaseClient;
import org.apache.metron.profiler.ProfileMeasurement;
import org.apache.metron.profiler.ProfilePeriod;
import org.apache.metron.profiler.hbase.ColumnBuilder;
import org.apache.metron.profiler.hbase.RowKeyBuilder;
-import org.apache.storm.hbase.common.ColumnList;
import java.util.List;
import java.util.concurrent.TimeUnit;
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3a2ecc40/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileHBaseMapper.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileHBaseMapper.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileHBaseMapper.java
index b0b33cc..2c8cb67 100644
--- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileHBaseMapper.java
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileHBaseMapper.java
@@ -22,18 +22,21 @@ package org.apache.metron.profiler.bolt;
import backtype.storm.tuple.Tuple;
import org.apache.commons.beanutils.BeanMap;
+import org.apache.commons.lang.StringUtils;
+import org.apache.metron.common.configuration.profiler.ProfileConfig;
import org.apache.metron.common.dsl.ParseException;
+import org.apache.metron.hbase.bolt.mapper.HBaseMapper;
import org.apache.metron.profiler.ProfileMeasurement;
import org.apache.metron.profiler.hbase.ColumnBuilder;
import org.apache.metron.profiler.hbase.SaltyRowKeyBuilder;
import org.apache.metron.profiler.hbase.ValueOnlyColumnBuilder;
import org.apache.metron.profiler.hbase.RowKeyBuilder;
import org.apache.metron.profiler.stellar.StellarExecutor;
-import org.apache.storm.hbase.bolt.mapper.HBaseMapper;
-import org.apache.storm.hbase.common.ColumnList;
+import org.apache.metron.hbase.bolt.mapper.ColumnList;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
import static java.lang.String.format;
import static org.apache.commons.collections.CollectionUtils.isEmpty;
@@ -70,7 +73,7 @@ public class ProfileHBaseMapper implements HBaseMapper {
/**
* Define the row key for a ProfileMeasurement.
- * @param tuple The tuple containing a ProfileMeasurement.
+ * @param tuple The tuple to map to Hbase.
* @return The Hbase row key.
*/
@Override
@@ -82,7 +85,7 @@ public class ProfileHBaseMapper implements HBaseMapper {
/**
* Defines how the fields within a ProfileMeasurement are mapped to HBase.
- * @param tuple The tuple containing the ProfileMeasurement.
+ * @param tuple The tuple to map to Hbase.
*/
@Override
public ColumnList columns(Tuple tuple) {
@@ -91,6 +94,18 @@ public class ProfileHBaseMapper implements HBaseMapper {
}
/**
+ * The time-to-live can be defined differently for each profile.
+ * @param tuple The tuple to map to Hbase.
+ * @return
+ */
+ @Override
+ public Optional<Long> getTTL(Tuple tuple) {
+ // TTL not yet supported for profiles
+ Optional result = Optional.empty();
+ return result;
+ }
+
+ /**
* Executes each of the 'groupBy' expressions. The result of each
* expression are the groups used to sort the data as part of the
* row key.
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3a2ecc40/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/hbase/ColumnBuilder.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/hbase/ColumnBuilder.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/hbase/ColumnBuilder.java
index 44bf129..c645822 100644
--- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/hbase/ColumnBuilder.java
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/hbase/ColumnBuilder.java
@@ -20,8 +20,8 @@
package org.apache.metron.profiler.hbase;
+import org.apache.metron.hbase.bolt.mapper.ColumnList;
import org.apache.metron.profiler.ProfileMeasurement;
-import org.apache.storm.hbase.common.ColumnList;
import java.io.Serializable;
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3a2ecc40/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/hbase/ValueOnlyColumnBuilder.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/hbase/ValueOnlyColumnBuilder.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/hbase/ValueOnlyColumnBuilder.java
index cc6aa5a..bb1baf6 100644
--- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/hbase/ValueOnlyColumnBuilder.java
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/hbase/ValueOnlyColumnBuilder.java
@@ -23,7 +23,7 @@ package org.apache.metron.profiler.hbase;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.metron.common.utils.SerDeUtils;
import org.apache.metron.profiler.ProfileMeasurement;
-import org.apache.storm.hbase.common.ColumnList;
+import org.apache.metron.hbase.bolt.mapper.ColumnList;
/**
* A ColumnBuilder that writes only the value of a ProfileMeasurement.
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3a2ecc40/metron-platform/metron-hbase/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-hbase/pom.xml b/metron-platform/metron-hbase/pom.xml
index 330b52c..c346325 100644
--- a/metron-platform/metron-hbase/pom.xml
+++ b/metron-platform/metron-hbase/pom.xml
@@ -115,22 +115,6 @@
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
- <artifactId>storm-hbase</artifactId>
- <version>${global_storm_version}</version>
- <scope>provided</scope>
- <exclusions>
- <exclusion>
- <artifactId>servlet-api</artifactId>
- <groupId>javax.servlet</groupId>
- </exclusion>
- <exclusion>
- <artifactId>log4j-over-slf4j</artifactId>
- <groupId>org.slf4j</groupId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.storm</groupId>
<artifactId>storm-hdfs</artifactId>
<version>${global_storm_version}</version>
<scope>provided</scope>
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3a2ecc40/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/bolt/HBaseBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/bolt/HBaseBolt.java b/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/bolt/HBaseBolt.java
index a2da837..ede1d8c 100644
--- a/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/bolt/HBaseBolt.java
+++ b/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/bolt/HBaseBolt.java
@@ -29,19 +29,17 @@ import backtype.storm.tuple.Tuple;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Durability;
-import org.apache.hadoop.hbase.client.Mutation;
import org.apache.metron.hbase.HTableProvider;
import org.apache.metron.hbase.TableProvider;
+import org.apache.metron.hbase.bolt.mapper.ColumnList;
+import org.apache.metron.hbase.bolt.mapper.HBaseMapper;
import org.apache.metron.hbase.client.HBaseClient;
-import org.apache.storm.hbase.bolt.mapper.HBaseMapper;
-import org.apache.storm.hbase.common.ColumnList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.InvocationTargetException;
-import java.util.LinkedList;
-import java.util.List;
import java.util.Map;
+import java.util.Optional;
/**
* A bolt that writes to HBase.
@@ -158,10 +156,17 @@ public class HBaseBolt extends BaseRichBolt {
* @param tuple Contains the data elements that need written to HBase.
*/
private void save(Tuple tuple) {
- byte[] rowKey = this.mapper.rowKey(tuple);
- ColumnList cols = this.mapper.columns(tuple);
+ byte[] rowKey = mapper.rowKey(tuple);
+ ColumnList cols = mapper.columns(tuple);
Durability durability = writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL;
- hbaseClient.addMutation(rowKey, cols, durability);
+
+ Optional<Long> ttl = mapper.getTTL(tuple);
+ if(ttl.isPresent()) {
+ hbaseClient.addMutation(rowKey, cols, durability, ttl.get());
+ } else {
+ hbaseClient.addMutation(rowKey, cols, durability);
+ }
+
batchHelper.addBatch(tuple);
}
@@ -174,9 +179,8 @@ public class HBaseBolt extends BaseRichBolt {
}
/**
- *
- * @param connectorImpl
- * @return
+ * Creates a TableProvider based on a class name.
+ * @param connectorImpl The class name of a TableProvider
*/
private static TableProvider getTableProvider(String connectorImpl) {
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3a2ecc40/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/bolt/mapper/ColumnList.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/bolt/mapper/ColumnList.java b/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/bolt/mapper/ColumnList.java
new file mode 100644
index 0000000..ad4f8d6
--- /dev/null
+++ b/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/bolt/mapper/ColumnList.java
@@ -0,0 +1,178 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.metron.hbase.bolt.mapper;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Represents a list of HBase columns.
+ *
+ * There are two types of columns, <i>standard</i> and <i>counter</i>.
+ *
+ * Standard columns have <i>column family</i> (required), <i>qualifier</i> (optional),
+ * <i>timestamp</i> (optional), and a <i>value</i> (optional) values.
+ *
+ * Counter columns have <i>column family</i> (required), <i>qualifier</i> (optional),
+ * and an <i>increment</i> (optional, but recommended) values.
+ *
+ * Inserts/Updates can be added via the <code>addColumn()</code> and <code>addCounter()</code>
+ * methods.
+ *
+ * Original code based on the Apache Storm project. See
+ * https://github.com/apache/storm/tree/master/external/storm-hbase.
+ */
+public class ColumnList {
+
+ public static abstract class AbstractColumn {
+ byte[] family, qualifier;
+
+ AbstractColumn(byte[] family, byte[] qualifier){
+ this.family = family;
+ this.qualifier = qualifier;
+ }
+
+ public byte[] getFamily() {
+ return family;
+ }
+
+ public byte[] getQualifier() {
+ return qualifier;
+ }
+ }
+
+ public static class Column extends AbstractColumn {
+ byte[] value;
+ long ts = -1;
+
+ Column(byte[] family, byte[] qualifier, long ts, byte[] value){
+ super(family, qualifier);
+ this.value = value;
+ this.ts = ts;
+ }
+
+ public byte[] getValue() {
+ return value;
+ }
+
+ public long getTs() {
+ return ts;
+ }
+ }
+
+ public static class Counter extends AbstractColumn {
+ long incr = 0;
+ Counter(byte[] family, byte[] qualifier, long incr){
+ super(family, qualifier);
+ this.incr = incr;
+ }
+
+ public long getIncrement() {
+ return incr;
+ }
+ }
+
+ private ArrayList<ColumnList.Column> columns;
+ private ArrayList<ColumnList.Counter> counters;
+
+ private ArrayList<Column> columns(){
+ if(this.columns == null){
+ this.columns = new ArrayList<>();
+ }
+ return this.columns;
+ }
+
+ private ArrayList<Counter> counters(){
+ if(this.counters == null){
+ this.counters = new ArrayList<>();
+ }
+ return this.counters;
+ }
+
+ /**
+ * Add a standard HBase column.
+ */
+ public ColumnList addColumn(byte[] family, byte[] qualifier, long ts, byte[] value){
+ columns().add(new Column(family, qualifier, ts, value));
+ return this;
+ }
+
+ /**
+ * Add a standard HBase column
+ */
+ public ColumnList addColumn(byte[] family, byte[] qualifier, byte[] value){
+ columns().add(new Column(family, qualifier, -1, value));
+ return this;
+ }
+
+ /**
+ * Add a standard HBase column given an instance of a class that implements
+ * the <code>IColumn</code> interface.
+ */
+ public ColumnList addColumn(IColumn column){
+ return this.addColumn(column.family(), column.qualifier(), column.timestamp(), column.value());
+ }
+
+ /**
+ * Add an HBase counter column.
+ */
+ public ColumnList addCounter(byte[] family, byte[] qualifier, long incr){
+ counters().add(new Counter(family, qualifier, incr));
+ return this;
+ }
+
+ /**
+ * Add an HBase counter column given an instance of a class that implements the
+ * <code>ICounter</code> interface.
+ */
+ public ColumnList addCounter(ICounter counter){
+ return this.addCounter(counter.family(), counter.qualifier(), counter.increment());
+ }
+
+
+ /**
+ * Query to determine if we have column definitions.
+ */
+ public boolean hasColumns(){
+ return this.columns != null;
+ }
+
+ /**
+ * Query to determine if we have counter definitions.
+ */
+ public boolean hasCounters(){
+ return this.counters != null;
+ }
+
+ /**
+ * Get the list of column definitions.
+ */
+ public List<Column> getColumns(){
+ return this.columns;
+ }
+
+ /**
+ * Get the list of counter definitions.
+ */
+ public List<Counter> getCounters(){
+ return this.counters;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3a2ecc40/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/bolt/mapper/HBaseMapper.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/bolt/mapper/HBaseMapper.java b/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/bolt/mapper/HBaseMapper.java
new file mode 100644
index 0000000..e662c76
--- /dev/null
+++ b/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/bolt/mapper/HBaseMapper.java
@@ -0,0 +1,56 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.metron.hbase.bolt.mapper;
+
+import backtype.storm.tuple.Tuple;
+import java.io.Serializable;
+import java.util.Optional;
+
+/**
+ * Maps a <code>backtype.storm.tuple.Tuple</code> object
+ * to a row in an HBase table.
+ *
+ * Original code based on the Apache Storm project. See
+ * https://github.com/apache/storm/tree/master/external/storm-hbase.
+ */
+public interface HBaseMapper extends Serializable {
+
+ /**
+ * Given a tuple, return the HBase rowkey.
+ *
+ * @param tuple The tuple to map to Hbase
+ */
+ byte[] rowKey(Tuple tuple);
+
+ /**
+ * Given a tuple, return a list of HBase columns to insert.
+ *
+ * @param tuple The tuple to map to Hbase
+ */
+ ColumnList columns(Tuple tuple);
+
+ /**
+ * Given a tuple, return the time to live.
+ *
+ * @param tuple The tuple to map to Hbase
+ */
+ Optional<Long> getTTL(Tuple tuple);
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3a2ecc40/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/bolt/mapper/HBaseProjectionCriteria.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/bolt/mapper/HBaseProjectionCriteria.java b/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/bolt/mapper/HBaseProjectionCriteria.java
new file mode 100644
index 0000000..d3c8237
--- /dev/null
+++ b/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/bolt/mapper/HBaseProjectionCriteria.java
@@ -0,0 +1,92 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.metron.hbase.bolt.mapper;
+
+import com.google.common.collect.Lists;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Allows the user to specify the projection criteria.
+ * If only columnFamily is specified all columns from that family will be returned.
+ * If a column is specified only that column from that family will be returned.
+ *
+ * Original code based on the Apache Storm project. See
+ * https://github.com/apache/storm/tree/master/external/storm-hbase.
+ */
+public class HBaseProjectionCriteria implements Serializable {
+
+ private List<byte[]> columnFamilies;
+ private List<ColumnMetaData> columns;
+
+ public static class ColumnMetaData implements Serializable {
+
+ private byte[] columnFamily;
+ private byte[] qualifier;
+
+ public ColumnMetaData(String columnFamily, String qualifier) {
+ this.columnFamily = columnFamily.getBytes();
+ this.qualifier = qualifier.getBytes();
+ }
+
+ public byte[] getColumnFamily() {
+ return columnFamily;
+ }
+
+ public byte[] getQualifier() {
+ return qualifier;
+ }
+ }
+
+ public HBaseProjectionCriteria() {
+ columnFamilies = Lists.newArrayList();
+ columns = Lists.newArrayList();
+ }
+
+ /**
+ * all columns from this family will be included as result of HBase lookup.
+ * @param columnFamily
+ * @return
+ */
+ public HBaseProjectionCriteria addColumnFamily(String columnFamily) {
+ this.columnFamilies.add(columnFamily.getBytes());
+ return this;
+ }
+
+ /**
+ * Only this column from the the columnFamily will be included as result of HBase lookup.
+ * @param column
+ * @return
+ */
+ public HBaseProjectionCriteria addColumn(ColumnMetaData column) {
+ this.columns.add(column);
+ return this;
+ }
+
+ public List<ColumnMetaData> getColumns() {
+ return columns;
+ }
+
+ public List<byte[]> getColumnFamilies() {
+ return columnFamilies;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3a2ecc40/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/bolt/mapper/IColumn.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/bolt/mapper/IColumn.java b/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/bolt/mapper/IColumn.java
new file mode 100644
index 0000000..d5749ad
--- /dev/null
+++ b/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/bolt/mapper/IColumn.java
@@ -0,0 +1,35 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.metron.hbase.bolt.mapper;
+
+/**
+ * Interface definition for classes that support being written to HBase as
+ * a regular column.
+ *
+ * Original code based on the Apache Storm project. See
+ * https://github.com/apache/storm/tree/master/external/storm-hbase.
+ */
+public interface IColumn {
+ byte[] family();
+ byte[] qualifier();
+ byte[] value();
+ long timestamp();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3a2ecc40/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/bolt/mapper/ICounter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/bolt/mapper/ICounter.java b/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/bolt/mapper/ICounter.java
new file mode 100644
index 0000000..03fc9fe
--- /dev/null
+++ b/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/bolt/mapper/ICounter.java
@@ -0,0 +1,34 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.metron.hbase.bolt.mapper;
+
+/**
+ * Interface definition for classes that support being written to HBase as
+ * a counter column.
+ *
+ * Original code based on the Apache Storm project. See
+ * https://github.com/apache/storm/tree/master/external/storm-hbase.
+ */
+public interface ICounter {
+ byte[] family();
+ byte[] qualifier();
+ long increment();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3a2ecc40/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/client/HBaseClient.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/client/HBaseClient.java b/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/client/HBaseClient.java
index e078d50..ff4903d 100644
--- a/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/client/HBaseClient.java
+++ b/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/client/HBaseClient.java
@@ -29,8 +29,8 @@ import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.metron.hbase.TableProvider;
-import org.apache.storm.hbase.bolt.mapper.HBaseProjectionCriteria;
-import org.apache.storm.hbase.common.ColumnList;
+import org.apache.metron.hbase.bolt.mapper.ColumnList;
+import org.apache.metron.hbase.bolt.mapper.HBaseProjectionCriteria;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -106,7 +106,7 @@ public class HBaseClient implements Closeable {
* @param durability The durability of the mutation.
* @param timeToLiveMillis The time to live in milliseconds.
*/
- public void addMutation(byte[] rowKey, ColumnList cols, Durability durability, long timeToLiveMillis) {
+ public void addMutation(byte[] rowKey, ColumnList cols, Durability durability, Long timeToLiveMillis) {
if (cols.hasColumns()) {
Put put = createPut(rowKey, cols, durability, timeToLiveMillis);
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3a2ecc40/metron-platform/metron-hbase/src/test/java/org/apache/metron/hbase/WidgetMapper.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-hbase/src/test/java/org/apache/metron/hbase/WidgetMapper.java b/metron-platform/metron-hbase/src/test/java/org/apache/metron/hbase/WidgetMapper.java
index c7d3fae..4f97a5b 100644
--- a/metron-platform/metron-hbase/src/test/java/org/apache/metron/hbase/WidgetMapper.java
+++ b/metron-platform/metron-hbase/src/test/java/org/apache/metron/hbase/WidgetMapper.java
@@ -22,16 +22,27 @@ package org.apache.metron.hbase;
import backtype.storm.tuple.Tuple;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.storm.hbase.bolt.mapper.HBaseMapper;
-import org.apache.storm.hbase.common.ColumnList;
+import org.apache.metron.hbase.bolt.mapper.ColumnList;
+import org.apache.metron.hbase.bolt.mapper.HBaseMapper;
+
+import java.util.Optional;
-import java.util.Calendar;
/**
* Maps a Widget to HBase. Used only for testing.
*/
public class WidgetMapper implements HBaseMapper {
+ private Optional<Long> ttl;
+
+ public WidgetMapper() {
+ this.ttl = Optional.empty();
+ }
+
+ public WidgetMapper(Long ttl) {
+ this.ttl = Optional.of(ttl);
+ }
+
@Override
public byte[] rowKey(Tuple tuple) {
Widget w = (Widget) tuple.getValueByField("widget");
@@ -48,6 +59,11 @@ public class WidgetMapper implements HBaseMapper {
return cols;
}
+ @Override
+ public Optional<Long> getTTL(Tuple tuple) {
+ return ttl;
+ }
+
public static final String CF_STRING = "cfWidget";
public static final byte[] CF = Bytes.toBytes(CF_STRING);
public static final byte[] QNAME = Bytes.toBytes("qName");
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3a2ecc40/metron-platform/metron-hbase/src/test/java/org/apache/metron/hbase/bolt/HBaseBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-hbase/src/test/java/org/apache/metron/hbase/bolt/HBaseBoltTest.java b/metron-platform/metron-hbase/src/test/java/org/apache/metron/hbase/bolt/HBaseBoltTest.java
index 8b39aaa..621720e 100644
--- a/metron-platform/metron-hbase/src/test/java/org/apache/metron/hbase/bolt/HBaseBoltTest.java
+++ b/metron-platform/metron-hbase/src/test/java/org/apache/metron/hbase/bolt/HBaseBoltTest.java
@@ -26,13 +26,13 @@ import org.apache.metron.hbase.Widget;
import org.apache.metron.hbase.WidgetMapper;
import org.apache.metron.hbase.client.HBaseClient;
import org.apache.metron.test.bolt.BaseBoltTest;
-import org.apache.storm.hbase.bolt.mapper.HBaseMapper;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.ArgumentCaptor;
import java.io.IOException;
import java.util.Collections;
-import java.util.List;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
@@ -52,7 +52,6 @@ public class HBaseBoltTest extends BaseBoltTest {
private Tuple tuple2;
private Widget widget1;
private Widget widget2;
- private HBaseMapper mapper;
@Before
public void setupTuples() throws Exception {
@@ -68,7 +67,6 @@ public class HBaseBoltTest extends BaseBoltTest {
@Before
public void setup() throws Exception {
- mapper = new WidgetMapper();
tuple1 = mock(Tuple.class);
tuple2 = mock(Tuple.class);
client = mock(HBaseClient.class);
@@ -77,7 +75,7 @@ public class HBaseBoltTest extends BaseBoltTest {
/**
* Create a ProfileBuilderBolt to test
*/
- private HBaseBolt createBolt(int batchSize) throws IOException {
+ private HBaseBolt createBolt(int batchSize, WidgetMapper mapper) throws IOException {
HBaseBolt bolt = new HBaseBolt(tableName, mapper)
.withBatchSize(batchSize);
bolt.prepare(Collections.emptyMap(), topologyContext, outputCollector);
@@ -86,17 +84,18 @@ public class HBaseBoltTest extends BaseBoltTest {
}
/**
- * What happens if the batch is full?
+ * What happens if the batch is ready to flush?
*
* If the batch size is 2 and we have received 2 tuples the batch should be flushed.
*/
@Test
public void testBatchReady() throws Exception {
- HBaseBolt bolt = createBolt(2);
+ HBaseBolt bolt = createBolt(2, new WidgetMapper());
bolt.execute(tuple1);
bolt.execute(tuple2);
// batch size is 2, received 2 tuples - flush the batch
+ verify(client, times(2)).addMutation(any(), any(), any());
verify(client, times(1)).mutate();
}
@@ -105,10 +104,11 @@ public class HBaseBoltTest extends BaseBoltTest {
*/
@Test
public void testBatchNotReady() throws Exception {
- HBaseBolt bolt = createBolt(2);
+ HBaseBolt bolt = createBolt(2, new WidgetMapper());
bolt.execute(tuple1);
- // batch size is 2, but only 1 tuple received - do not flush batch
+ // 1 put was added to the batch, but nothing was flushed
+ verify(client, times(1)).addMutation(any(), any(), any());
verify(client, times(0)).mutate();
}
@@ -117,10 +117,11 @@ public class HBaseBoltTest extends BaseBoltTest {
*/
@Test
public void testTimeFlush() throws Exception {
- HBaseBolt bolt = createBolt(2);
+ HBaseBolt bolt = createBolt(2, new WidgetMapper());
// the batch is not ready to write
bolt.execute(tuple1);
+ verify(client, times(1)).addMutation(any(), any(), any());
verify(client, times(0)).mutate();
// the batch should be flushed after the tick tuple
@@ -128,6 +129,30 @@ public class HBaseBoltTest extends BaseBoltTest {
verify(client, times(1)).mutate();
}
+ /**
+ * The mapper can define a TTL that the HBaseBolt uses to determine
+ * if the Put to Hbase needs the TTL set.
+ */
+ @Test
+ public void testWriteWithTTL() throws Exception {
+
+ // setup - create a mapper with a TTL set
+ final Long expectedTTL = 2000L;
+ WidgetMapper mapperWithTTL = new WidgetMapper(expectedTTL);
+
+ // execute
+ HBaseBolt bolt = createBolt(2, mapperWithTTL);
+ bolt.execute(tuple1);
+ bolt.execute(tuple2);
+
+ // used to grab the actual TTL
+ ArgumentCaptor<Long> ttlCaptor = ArgumentCaptor.forClass(Long.class);
+
+ // validate - ensure the Puts written with the TTL
+ verify(client, times(2)).addMutation(any(), any(), any(), ttlCaptor.capture());
+ Assert.assertEquals(expectedTTL, ttlCaptor.getValue());
+ }
+
private static Tuple mockTuple(String componentId, String streamId) {
Tuple tuple = mock(Tuple.class);
when(tuple.getSourceComponent()).thenReturn(componentId);
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3a2ecc40/metron-platform/metron-hbase/src/test/java/org/apache/metron/hbase/client/HBaseClientTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-hbase/src/test/java/org/apache/metron/hbase/client/HBaseClientTest.java b/metron-platform/metron-hbase/src/test/java/org/apache/metron/hbase/client/HBaseClientTest.java
index e2afd1c..00a1126 100644
--- a/metron-platform/metron-hbase/src/test/java/org/apache/metron/hbase/client/HBaseClientTest.java
+++ b/metron-platform/metron-hbase/src/test/java/org/apache/metron/hbase/client/HBaseClientTest.java
@@ -30,9 +30,9 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.metron.hbase.Widget;
import org.apache.metron.hbase.WidgetMapper;
-import org.apache.storm.hbase.bolt.mapper.HBaseMapper;
-import org.apache.storm.hbase.bolt.mapper.HBaseProjectionCriteria;
-import org.apache.storm.hbase.common.ColumnList;
+import org.apache.metron.hbase.bolt.mapper.ColumnList;
+import org.apache.metron.hbase.bolt.mapper.HBaseMapper;
+import org.apache.metron.hbase.bolt.mapper.HBaseProjectionCriteria;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;