You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/11/05 20:04:14 UTC
[08/54] [abbrv] [partial] storm git commit: remove files added by
mistake
http://git-wip-us.apache.org/repos/asf/storm/blob/d13d1680/_site/target/checkout/external/storm-hbase/LICENSE
----------------------------------------------------------------------
diff --git a/_site/target/checkout/external/storm-hbase/LICENSE b/_site/target/checkout/external/storm-hbase/LICENSE
deleted file mode 100644
index e06d208..0000000
--- a/_site/target/checkout/external/storm-hbase/LICENSE
+++ /dev/null
@@ -1,202 +0,0 @@
-Apache License
- Version 2.0, January 2004
- http://www.apache.org/licenses/
-
- TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
-
- 1. Definitions.
-
- "License" shall mean the terms and conditions for use, reproduction,
- and distribution as defined by Sections 1 through 9 of this document.
-
- "Licensor" shall mean the copyright owner or entity authorized by
- the copyright owner that is granting the License.
-
- "Legal Entity" shall mean the union of the acting entity and all
- other entities that control, are controlled by, or are under common
- control with that entity. For the purposes of this definition,
- "control" means (i) the power, direct or indirect, to cause the
- direction or management of such entity, whether by contract or
- otherwise, or (ii) ownership of fifty percent (50%) or more of the
- outstanding shares, or (iii) beneficial ownership of such entity.
-
- "You" (or "Your") shall mean an individual or Legal Entity
- exercising permissions granted by this License.
-
- "Source" form shall mean the preferred form for making modifications,
- including but not limited to software source code, documentation
- source, and configuration files.
-
- "Object" form shall mean any form resulting from mechanical
- transformation or translation of a Source form, including but
- not limited to compiled object code, generated documentation,
- and conversions to other media types.
-
- "Work" shall mean the work of authorship, whether in Source or
- Object form, made available under the License, as indicated by a
- copyright notice that is included in or attached to the work
- (an example is provided in the Appendix below).
-
- "Derivative Works" shall mean any work, whether in Source or Object
- form, that is based on (or derived from) the Work and for which the
- editorial revisions, annotations, elaborations, or other modifications
- represent, as a whole, an original work of authorship. For the purposes
- of this License, Derivative Works shall not include works that remain
- separable from, or merely link (or bind by name) to the interfaces of,
- the Work and Derivative Works thereof.
-
- "Contribution" shall mean any work of authorship, including
- the original version of the Work and any modifications or additions
- to that Work or Derivative Works thereof, that is intentionally
- submitted to Licensor for inclusion in the Work by the copyright owner
- or by an individual or Legal Entity authorized to submit on behalf of
- the copyright owner. For the purposes of this definition, "submitted"
- means any form of electronic, verbal, or written communication sent
- to the Licensor or its representatives, including but not limited to
- communication on electronic mailing lists, source code control systems,
- and issue tracking systems that are managed by, or on behalf of, the
- Licensor for the purpose of discussing and improving the Work, but
- excluding communication that is conspicuously marked or otherwise
- designated in writing by the copyright owner as "Not a Contribution."
-
- "Contributor" shall mean Licensor and any individual or Legal Entity
- on behalf of whom a Contribution has been received by Licensor and
- subsequently incorporated within the Work.
-
- 2. Grant of Copyright License. Subject to the terms and conditions of
- this License, each Contributor hereby grants to You a perpetual,
- worldwide, non-exclusive, no-charge, royalty-free, irrevocable
- copyright license to reproduce, prepare Derivative Works of,
- publicly display, publicly perform, sublicense, and distribute the
- Work and such Derivative Works in Source or Object form.
-
- 3. Grant of Patent License. Subject to the terms and conditions of
- this License, each Contributor hereby grants to You a perpetual,
- worldwide, non-exclusive, no-charge, royalty-free, irrevocable
- (except as stated in this section) patent license to make, have made,
- use, offer to sell, sell, import, and otherwise transfer the Work,
- where such license applies only to those patent claims licensable
- by such Contributor that are necessarily infringed by their
- Contribution(s) alone or by combination of their Contribution(s)
- with the Work to which such Contribution(s) was submitted. If You
- institute patent litigation against any entity (including a
- cross-claim or counterclaim in a lawsuit) alleging that the Work
- or a Contribution incorporated within the Work constitutes direct
- or contributory patent infringement, then any patent licenses
- granted to You under this License for that Work shall terminate
- as of the date such litigation is filed.
-
- 4. Redistribution. You may reproduce and distribute copies of the
- Work or Derivative Works thereof in any medium, with or without
- modifications, and in Source or Object form, provided that You
- meet the following conditions:
-
- (a) You must give any other recipients of the Work or
- Derivative Works a copy of this License; and
-
- (b) You must cause any modified files to carry prominent notices
- stating that You changed the files; and
-
- (c) You must retain, in the Source form of any Derivative Works
- that You distribute, all copyright, patent, trademark, and
- attribution notices from the Source form of the Work,
- excluding those notices that do not pertain to any part of
- the Derivative Works; and
-
- (d) If the Work includes a "NOTICE" text file as part of its
- distribution, then any Derivative Works that You distribute must
- include a readable copy of the attribution notices contained
- within such NOTICE file, excluding those notices that do not
- pertain to any part of the Derivative Works, in at least one
- of the following places: within a NOTICE text file distributed
- as part of the Derivative Works; within the Source form or
- documentation, if provided along with the Derivative Works; or,
- within a display generated by the Derivative Works, if and
- wherever such third-party notices normally appear. The contents
- of the NOTICE file are for informational purposes only and
- do not modify the License. You may add Your own attribution
- notices within Derivative Works that You distribute, alongside
- or as an addendum to the NOTICE text from the Work, provided
- that such additional attribution notices cannot be construed
- as modifying the License.
-
- You may add Your own copyright statement to Your modifications and
- may provide additional or different license terms and conditions
- for use, reproduction, or distribution of Your modifications, or
- for any such Derivative Works as a whole, provided Your use,
- reproduction, and distribution of the Work otherwise complies with
- the conditions stated in this License.
-
- 5. Submission of Contributions. Unless You explicitly state otherwise,
- any Contribution intentionally submitted for inclusion in the Work
- by You to the Licensor shall be under the terms and conditions of
- this License, without any additional terms or conditions.
- Notwithstanding the above, nothing herein shall supersede or modify
- the terms of any separate license agreement you may have executed
- with Licensor regarding such Contributions.
-
- 6. Trademarks. This License does not grant permission to use the trade
- names, trademarks, service marks, or product names of the Licensor,
- except as required for reasonable and customary use in describing the
- origin of the Work and reproducing the content of the NOTICE file.
-
- 7. Disclaimer of Warranty. Unless required by applicable law or
- agreed to in writing, Licensor provides the Work (and each
- Contributor provides its Contributions) on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- implied, including, without limitation, any warranties or conditions
- of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
- PARTICULAR PURPOSE. You are solely responsible for determining the
- appropriateness of using or redistributing the Work and assume any
- risks associated with Your exercise of permissions under this License.
-
- 8. Limitation of Liability. In no event and under no legal theory,
- whether in tort (including negligence), contract, or otherwise,
- unless required by applicable law (such as deliberate and grossly
- negligent acts) or agreed to in writing, shall any Contributor be
- liable to You for damages, including any direct, indirect, special,
- incidental, or consequential damages of any character arising as a
- result of this License or out of the use or inability to use the
- Work (including but not limited to damages for loss of goodwill,
- work stoppage, computer failure or malfunction, or any and all
- other commercial damages or losses), even if such Contributor
- has been advised of the possibility of such damages.
-
- 9. Accepting Warranty or Additional Liability. While redistributing
- the Work or Derivative Works thereof, You may choose to offer,
- and charge a fee for, acceptance of support, warranty, indemnity,
- or other liability obligations and/or rights consistent with this
- License. However, in accepting such obligations, You may act only
- on Your own behalf and on Your sole responsibility, not on behalf
- of any other Contributor, and only if You agree to indemnify,
- defend, and hold each Contributor harmless for any liability
- incurred by, or claims asserted against, such Contributor by reason
- of your accepting any such warranty or additional liability.
-
- END OF TERMS AND CONDITIONS
-
- APPENDIX: How to apply the Apache License to your work.
-
- To apply the Apache License to your work, attach the following
- boilerplate notice, with the fields enclosed by brackets "{}"
- replaced with your own identifying information. (Don't include
- the brackets!) The text should be enclosed in the appropriate
- comment syntax for the file format. We also recommend that a
- file or class name and description of purpose be included on the
- same "printed page" as the copyright notice for easier
- identification within third-party archives.
-
- Copyright {yyyy} {name of copyright owner}
-
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
-
http://git-wip-us.apache.org/repos/asf/storm/blob/d13d1680/_site/target/checkout/external/storm-hbase/README.md
----------------------------------------------------------------------
diff --git a/_site/target/checkout/external/storm-hbase/README.md b/_site/target/checkout/external/storm-hbase/README.md
deleted file mode 100644
index a5f252d..0000000
--- a/_site/target/checkout/external/storm-hbase/README.md
+++ /dev/null
@@ -1,215 +0,0 @@
-#Storm HBase
-
-Storm/Trident integration for [Apache HBase](https://hbase.apache.org)
-
-## Usage
-The main API for interacting with HBase is the `org.apache.storm.hbase.bolt.mapper.HBaseMapper`
-interface:
-
-```java
-public interface HBaseMapper extends Serializable {
- byte[] rowKey(Tuple tuple);
-
- ColumnList columns(Tuple tuple);
-}
-```
-
-The `rowKey()` method is straightforward: given a Storm tuple, return a byte array representing the
-row key.
-
-The `columns()` method defines what will be written to an HBase row. The `ColumnList` class allows you
-to add both standard HBase columns as well as HBase counter columns.
-
-To add a standard column, use one of the `addColumn()` methods:
-
-```java
-ColumnList cols = new ColumnList();
-cols.addColumn(this.columnFamily, field.getBytes(), toBytes(tuple.getValueByField(field)));
-```
-
-To add a counter column, use one of the `addCounter()` methods:
-
-```java
-ColumnList cols = new ColumnList();
-cols.addCounter(this.columnFamily, field.getBytes(), toLong(tuple.getValueByField(field)));
-```
-
-### SimpleHBaseMapper
-`storm-hbase` includes a general purpose `HBaseMapper` implementation called `SimpleHBaseMapper` that can map Storm
-tuples to both regular HBase columns as well as counter columns.
-
-To use `SimpleHBaseMapper`, you simply tell it which fields to map to which types of columns.
-
-The following code create a `SimpleHBaseMapper` instance that:
-
-1. Uses the `word` tuple value as a row key.
-2. Adds a standard HBase column for the tuple field `word`.
-3. Adds an HBase counter column for the tuple field `count`.
-4. Writes values to the `cf` column family.
-
-```java
-SimpleHBaseMapper mapper = new SimpleHBaseMapper()
- .withRowKeyField("word")
- .withColumnFields(new Fields("word"))
- .withCounterFields(new Fields("count"))
- .withColumnFamily("cf");
-```
-### HBaseBolt
-To use the `HBaseBolt`, construct it with the name of the table to write to, an a `HBaseMapper` implementation:
-
- ```java
-HBaseBolt hbase = new HBaseBolt("WordCount", mapper);
- ```
-
-The `HBaseBolt` will delegate to the `mapper` instance to figure out how to persist tuple data to HBase.
-
-###HBaseValueMapper
-This class allows you to transform the HBase lookup result into storm Values that will be emitted by the `HBaseLookupBolt`.
-
-```java
-public interface HBaseValueMapper extends Serializable {
- public List<Values> toTuples(Result result) throws Exception;
- void declareOutputFields(OutputFieldsDeclarer declarer);
-}
-```
-
-The `toTuples` method takes in a HBase `Result` instance and expects a List of `Values` instant.
-Each of the value returned by this function will be emitted by the `HBaseLookupBolt`.
-
-The `declareOutputFields` should be used to declare the outputFields of the `HBaseLookupBolt`.
-
-There is an example implementation in `src/test/java` directory.
-
-###HBaseProjectionCriteria
-This class allows you to specify the projection criteria for your HBase Get function. This is optional parameter
-for the lookupBolt and if you do not specify this instance all the columns will be returned by `HBaseLookupBolt`.
-
-```java
-public class HBaseProjectionCriteria implements Serializable {
- public HBaseProjectionCriteria addColumnFamily(String columnFamily);
- public HBaseProjectionCriteria addColumn(ColumnMetaData column);
-```
-`addColumnFamily` takes in columnFamily. Setting this parameter means all columns for this family will be included
- in the projection.
-
-`addColumn` takes in a columnMetaData instance. Setting this parameter means only this column from the column familty
- will be part of your projection.
-The following code creates a projectionCriteria which specifies a projection criteria that:
-
-1. includes count column from column family cf.
-2. includes all columns from column family cf2.
-
-```java
-HBaseProjectionCriteria projectionCriteria = new HBaseProjectionCriteria()
- .addColumn(new HBaseProjectionCriteria.ColumnMetaData("cf", "count"))
- .addColumnFamily("cf2");
-```
-
-###HBaseLookupBolt
-To use the `HBaseLookupBolt`, Construct it with the name of the table to write to, an implementation of `HBaseMapper`
-and an implementation of `HBaseRowToStormValueMapper`. You can optionally specify a `HBaseProjectionCriteria`.
-
-The `HBaseLookupBolt` will use the mapper to get rowKey to lookup for. It will use the `HBaseProjectionCriteria` to
-figure out which columns to include in the result and it will leverage the `HBaseRowToStormValueMapper` to get the
-values to be emitted by the bolt.
-
-You can look at an example topology LookupWordCount.java under `src/test/java`.
-## Example: Persistent Word Count
-A runnable example can be found in the `src/test/java` directory.
-
-### Setup
-The following steps assume you are running HBase locally, or there is an `hbase-site.xml` on the
-classpath pointing to your HBase cluster.
-
-Use the `hbase shell` command to create the schema:
-
-```
-> create 'WordCount', 'cf'
-```
-
-### Execution
-Run the `org.apache.storm.hbase.topology.PersistenWordCount` class (it will run the topology for 10 seconds, then exit).
-
-After (or while) the word count topology is running, run the `org.apache.storm.hbase.topology.WordCountClient` class
-to view the counter values stored in HBase. You should see something like to following:
-
-```
-Word: 'apple', Count: 6867
-Word: 'orange', Count: 6645
-Word: 'pineapple', Count: 6954
-Word: 'banana', Count: 6787
-Word: 'watermelon', Count: 6806
-```
-
-For reference, the sample topology is listed below:
-
-```java
-public class PersistentWordCount {
- private static final String WORD_SPOUT = "WORD_SPOUT";
- private static final String COUNT_BOLT = "COUNT_BOLT";
- private static final String HBASE_BOLT = "HBASE_BOLT";
-
-
- public static void main(String[] args) throws Exception {
- Config config = new Config();
-
- WordSpout spout = new WordSpout();
- WordCounter bolt = new WordCounter();
-
- SimpleHBaseMapper mapper = new SimpleHBaseMapper()
- .withRowKeyField("word")
- .withColumnFields(new Fields("word"))
- .withCounterFields(new Fields("count"))
- .withColumnFamily("cf");
-
- HBaseBolt hbase = new HBaseBolt("WordCount", mapper);
-
-
- // wordSpout ==> countBolt ==> HBaseBolt
- TopologyBuilder builder = new TopologyBuilder();
-
- builder.setSpout(WORD_SPOUT, spout, 1);
- builder.setBolt(COUNT_BOLT, bolt, 1).shuffleGrouping(WORD_SPOUT);
- builder.setBolt(HBASE_BOLT, hbase, 1).fieldsGrouping(COUNT_BOLT, new Fields("word"));
-
-
- if (args.length == 0) {
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology("test", config, builder.createTopology());
- Thread.sleep(10000);
- cluster.killTopology("test");
- cluster.shutdown();
- System.exit(0);
- } else {
- config.setNumWorkers(3);
- StormSubmitter.submitTopology(args[0], config, builder.createTopology());
- }
- }
-}
-```
-
-
-## License
-
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-
-## Committer Sponsors
-
- * P. Taylor Goetz ([ptgoetz@apache.org](mailto:ptgoetz@apache.org))
- * Sriharsha Chintalapani ([sriharsha@apache.org](mailto:sriharsha@apache.org))
-
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/d13d1680/_site/target/checkout/external/storm-hbase/pom.xml
----------------------------------------------------------------------
diff --git a/_site/target/checkout/external/storm-hbase/pom.xml b/_site/target/checkout/external/storm-hbase/pom.xml
deleted file mode 100644
index da6c00d..0000000
--- a/_site/target/checkout/external/storm-hbase/pom.xml
+++ /dev/null
@@ -1,77 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <artifactId>storm</artifactId>
- <groupId>org.apache.storm</groupId>
- <version>0.9.6</version>
- <relativePath>../../pom.xml</relativePath>
- </parent>
-
- <artifactId>storm-hbase</artifactId>
-
- <developers>
- <developer>
- <id>ptgoetz</id>
- <name>P. Taylor Goetz</name>
- <email>ptgoetz@gmail.com</email>
- </developer>
- </developers>
-
- <properties>
- <hbase.version>0.98.1-hadoop2</hbase.version>
- <hdfs.version>2.2.0</hdfs.version>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-core</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-client</artifactId>
- <version>${hbase.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-hdfs</artifactId>
- <version>${hdfs.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- </dependencies>
-</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/d13d1680/_site/target/checkout/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/AbstractHBaseBolt.java
----------------------------------------------------------------------
diff --git a/_site/target/checkout/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/AbstractHBaseBolt.java b/_site/target/checkout/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/AbstractHBaseBolt.java
deleted file mode 100644
index 5f6621b..0000000
--- a/_site/target/checkout/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/AbstractHBaseBolt.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.hbase.bolt;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.base.BaseRichBolt;
-import org.apache.commons.lang.Validate;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.storm.hbase.bolt.mapper.HBaseMapper;
-import org.apache.storm.hbase.common.HBaseClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-
-// TODO support more configuration options, for now we're defaulting to the hbase-*.xml files found on the classpath
-public abstract class AbstractHBaseBolt extends BaseRichBolt {
- private static final Logger LOG = LoggerFactory.getLogger(AbstractHBaseBolt.class);
-
- protected OutputCollector collector;
-
- protected transient HBaseClient hBaseClient;
- protected String tableName;
- protected HBaseMapper mapper;
- protected String configKey;
-
- public AbstractHBaseBolt(String tableName, HBaseMapper mapper) {
- Validate.notEmpty(tableName, "Table name can not be blank or null");
- Validate.notNull(mapper, "mapper can not be null");
- this.tableName = tableName;
- this.mapper = mapper;
- }
-
- @Override
- public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) {
- this.collector = collector;
- final Configuration hbConfig = HBaseConfiguration.create();
-
- Map<String, Object> conf = (Map<String, Object>)map.get(this.configKey);
- if(conf == null) {
- throw new IllegalArgumentException("HBase configuration not found using key '" + this.configKey + "'");
- }
- if(conf.get("hbase.rootdir") == null) {
- LOG.warn("No 'hbase.rootdir' value found in configuration! Using HBase defaults.");
- }
- for(String key : conf.keySet()) {
- hbConfig.set(key, String.valueOf(conf.get(key)));
- }
-
- this.hBaseClient = new HBaseClient(conf, hbConfig, tableName);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d13d1680/_site/target/checkout/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java
----------------------------------------------------------------------
diff --git a/_site/target/checkout/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java b/_site/target/checkout/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java
deleted file mode 100644
index f7f0886..0000000
--- a/_site/target/checkout/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.hbase.bolt;
-
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Tuple;
-import org.apache.hadoop.hbase.client.Durability;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.storm.hbase.bolt.mapper.HBaseMapper;
-import org.apache.storm.hbase.common.ColumnList;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-
-/**
- * Basic bolt for writing to HBase.
- *
- * Note: Each HBaseBolt defined in a topology is tied to a specific table.
- *
- */
-public class HBaseBolt extends AbstractHBaseBolt {
- private static final Logger LOG = LoggerFactory.getLogger(HBaseBolt.class);
-
- boolean writeToWAL = true;
-
- public HBaseBolt(String tableName, HBaseMapper mapper) {
- super(tableName, mapper);
- }
-
- public HBaseBolt writeToWAL(boolean writeToWAL) {
- this.writeToWAL = writeToWAL;
- return this;
- }
-
- public HBaseBolt withConfigKey(String configKey) {
- this.configKey = configKey;
- return this;
- }
-
- @Override
- public void execute(Tuple tuple) {
- byte[] rowKey = this.mapper.rowKey(tuple);
- ColumnList cols = this.mapper.columns(tuple);
- List<Mutation> mutations = hBaseClient.constructMutationReq(rowKey, cols, writeToWAL? Durability.SYNC_WAL : Durability.SKIP_WAL);
-
- try {
- this.hBaseClient.batchMutate(mutations);
- } catch(Exception e){
- LOG.warn("Failing tuple. Error writing rowKey " + rowKey, e);
- this.collector.fail(tuple);
- return;
- }
-
- this.collector.ack(tuple);
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
-
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d13d1680/_site/target/checkout/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseLookupBolt.java
----------------------------------------------------------------------
diff --git a/_site/target/checkout/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseLookupBolt.java b/_site/target/checkout/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseLookupBolt.java
deleted file mode 100644
index c6838be..0000000
--- a/_site/target/checkout/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseLookupBolt.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.hbase.bolt;
-
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-import com.google.common.collect.Lists;
-import org.apache.commons.lang.Validate;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.storm.hbase.bolt.mapper.HBaseMapper;
-import org.apache.storm.hbase.bolt.mapper.HBaseProjectionCriteria;
-import org.apache.storm.hbase.bolt.mapper.HBaseValueMapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Basic bolt for querying from HBase.
- *
- * Note: Each HBaseBolt defined in a topology is tied to a specific table.
- *
- */
-public class HBaseLookupBolt extends AbstractHBaseBolt {
- private static final Logger LOG = LoggerFactory.getLogger(HBaseLookupBolt.class);
-
- private HBaseValueMapper rowToTupleMapper;
-
- private HBaseProjectionCriteria projectionCriteria;
-
- public HBaseLookupBolt(String tableName, HBaseMapper mapper, HBaseValueMapper rowToTupleMapper){
- super(tableName, mapper);
- Validate.notNull(rowToTupleMapper, "rowToTupleMapper can not be null");
- this.rowToTupleMapper = rowToTupleMapper;
- }
-
- public HBaseLookupBolt withConfigKey(String configKey){
- this.configKey = configKey;
- return this;
- }
-
- public HBaseLookupBolt withProjectionCriteria(HBaseProjectionCriteria projectionCriteria) {
- this.projectionCriteria = projectionCriteria;
- return this;
- }
-
- @Override
- public void execute(Tuple tuple) {
- byte[] rowKey = this.mapper.rowKey(tuple);
- Get get = hBaseClient.constructGetRequests(rowKey, projectionCriteria);
-
- try {
- Result result = hBaseClient.batchGet(Lists.newArrayList(get))[0];
- for(Values values : rowToTupleMapper.toValues(tuple, result)) {
- this.collector.emit(values);
- }
- this.collector.ack(tuple);
- } catch (Exception e) {
- LOG.warn("Could not perform Lookup for rowKey =" + rowKey + " from Hbase.", e);
- this.collector.fail(tuple);
- }
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
- rowToTupleMapper.declareOutputFields(outputFieldsDeclarer);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d13d1680/_site/target/checkout/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseMapper.java
----------------------------------------------------------------------
diff --git a/_site/target/checkout/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseMapper.java b/_site/target/checkout/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseMapper.java
deleted file mode 100644
index 626ce96..0000000
--- a/_site/target/checkout/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseMapper.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.hbase.bolt.mapper;
-
-
-import backtype.storm.tuple.Tuple;
-import org.apache.storm.hbase.common.ColumnList;
-
-import java.io.Serializable;
-
-/**
- * Maps a <code>backtype.storm.tuple.Tuple</code> object
- * to a row in an HBase table.
- */
-public interface HBaseMapper extends Serializable {
-
- /**
- * Given a tuple, return the HBase rowkey.
- *
- * @param tuple
- * @return
- */
- byte[] rowKey(Tuple tuple);
-
- /**
- * Given a tuple, return a list of HBase columns to insert.
- *
- * @param tuple
- * @return
- */
- ColumnList columns(Tuple tuple);
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d13d1680/_site/target/checkout/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseProjectionCriteria.java
----------------------------------------------------------------------
diff --git a/_site/target/checkout/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseProjectionCriteria.java b/_site/target/checkout/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseProjectionCriteria.java
deleted file mode 100644
index 81e94b4..0000000
--- a/_site/target/checkout/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseProjectionCriteria.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.hbase.bolt.mapper;
-
-import com.google.common.collect.Lists;
-
-import java.io.Serializable;
-import java.util.List;
-
-/**
- * Allows the user to specify the projection criteria.
- * If only columnFamily is specified all columns from that family will be returned.
- * If a column is specified only that column from that family will be returned.
-
- */
-public class HBaseProjectionCriteria implements Serializable {
- private List<byte[]> columnFamilies;
- private List<ColumnMetaData> columns;
-
- public static class ColumnMetaData implements Serializable {
- private byte[] columnFamily;
- private byte[] qualifier;
-
- public ColumnMetaData(String columnFamily, String qualifier) {
- this.columnFamily = columnFamily.getBytes();
- this.qualifier = qualifier.getBytes();
- }
-
- public byte[] getColumnFamily() {
- return columnFamily;
- }
-
- public byte[] getQualifier() {
- return qualifier;
- }
- }
-
- public HBaseProjectionCriteria() {
- columnFamilies = Lists.newArrayList();
- columns = Lists.newArrayList();
- }
-
- /**
- * all columns from this family will be included as result of HBase lookup.
- * @param columnFamily
- * @return
- */
- public HBaseProjectionCriteria addColumnFamily(String columnFamily) {
- this.columnFamilies.add(columnFamily.getBytes());
- return this;
- }
-
- /**
- * Only this column from the the columnFamily will be included as result of HBase lookup.
- * @param column
- * @return
- */
- public HBaseProjectionCriteria addColumn(ColumnMetaData column) {
- this.columns.add(column);
- return this;
- }
-
- public List<ColumnMetaData> getColumns() {
- return columns;
- }
-
- public List<byte[]> getColumnFamilies() {
- return columnFamilies;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d13d1680/_site/target/checkout/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseValueMapper.java
----------------------------------------------------------------------
diff --git a/_site/target/checkout/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseValueMapper.java b/_site/target/checkout/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseValueMapper.java
deleted file mode 100644
index bc38b83..0000000
--- a/_site/target/checkout/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseValueMapper.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.hbase.bolt.mapper;
-
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.ITuple;
-import backtype.storm.tuple.Values;
-import org.apache.hadoop.hbase.client.Result;
-
-import java.io.Serializable;
-import java.util.List;
-
-public interface HBaseValueMapper extends Serializable {
- /**
- *
- * @param input tuple.
- * @param result HBase lookup result instance.
- * @return list of values that should be emitted by the lookup bolt.
- * @throws Exception
- */
- public List<Values> toValues(ITuple input, Result result) throws Exception;
-
- /**
- * declares the output fields for the lookup bolt.
- * @param declarer
- */
- void declareOutputFields(OutputFieldsDeclarer declarer);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d13d1680/_site/target/checkout/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/SimpleHBaseMapper.java
----------------------------------------------------------------------
diff --git a/_site/target/checkout/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/SimpleHBaseMapper.java b/_site/target/checkout/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/SimpleHBaseMapper.java
deleted file mode 100644
index da0efd4..0000000
--- a/_site/target/checkout/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/SimpleHBaseMapper.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.hbase.bolt.mapper;
-
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import org.apache.storm.hbase.common.ColumnList;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import static org.apache.storm.hbase.common.Utils.*;
-
-/**
- *
- */
-public class SimpleHBaseMapper implements HBaseMapper {
- private static final Logger LOG = LoggerFactory.getLogger(SimpleHBaseMapper.class);
-
- private String rowKeyField;
-// private String timestampField;
- private byte[] columnFamily;
- private Fields columnFields;
- private Fields counterFields;
-
- public SimpleHBaseMapper(){
- }
-
-
- public SimpleHBaseMapper withRowKeyField(String rowKeyField){
- this.rowKeyField = rowKeyField;
- return this;
- }
-
- public SimpleHBaseMapper withColumnFields(Fields columnFields){
- this.columnFields = columnFields;
- return this;
- }
-
- public SimpleHBaseMapper withCounterFields(Fields counterFields){
- this.counterFields = counterFields;
- return this;
- }
-
- public SimpleHBaseMapper withColumnFamily(String columnFamily){
- this.columnFamily = columnFamily.getBytes();
- return this;
- }
-
-// public SimpleTridentHBaseMapper withTimestampField(String timestampField){
-// this.timestampField = timestampField;
-// return this;
-// }
-
- @Override
- public byte[] rowKey(Tuple tuple) {
- Object objVal = tuple.getValueByField(this.rowKeyField);
- return toBytes(objVal);
- }
-
- @Override
- public ColumnList columns(Tuple tuple) {
- ColumnList cols = new ColumnList();
- if(this.columnFields != null){
- // TODO timestamps
- for(String field : this.columnFields){
- cols.addColumn(this.columnFamily, field.getBytes(), toBytes(tuple.getValueByField(field)));
- }
- }
- if(this.counterFields != null){
- for(String field : this.counterFields){
- cols.addCounter(this.columnFamily, field.getBytes(), toLong(tuple.getValueByField(field)));
- }
- }
- return cols;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d13d1680/_site/target/checkout/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/ColumnList.java
----------------------------------------------------------------------
diff --git a/_site/target/checkout/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/ColumnList.java b/_site/target/checkout/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/ColumnList.java
deleted file mode 100644
index 73703dc..0000000
--- a/_site/target/checkout/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/ColumnList.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.hbase.common;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Represents a list of HBase columns.
- *
- * There are two types of columns, <i>standard</i> and <i>counter</i>.
- *
- * Standard columns have <i>column family</i> (required), <i>qualifier</i> (optional),
- * <i>timestamp</i> (optional), and a <i>value</i> (optional) values.
- *
- * Counter columns have <i>column family</i> (required), <i>qualifier</i> (optional),
- * and an <i>increment</i> (optional, but recommended) values.
- *
- * Inserts/Updates can be added via the <code>addColumn()</code> and <code>addCounter()</code>
- * methods.
- *
- */
-public class ColumnList {
-
- public static abstract class AbstractColumn {
- byte[] family, qualifier;
-
- AbstractColumn(byte[] family, byte[] qualifier){
- this.family = family;
- this.qualifier = qualifier;
- }
-
- public byte[] getFamily() {
- return family;
- }
-
- public byte[] getQualifier() {
- return qualifier;
- }
-
- }
-
- public static class Column extends AbstractColumn {
- byte[] value;
- long ts = -1;
- Column(byte[] family, byte[] qualifier, long ts, byte[] value){
- super(family, qualifier);
- this.value = value;
- this.ts = ts;
- }
-
- public byte[] getValue() {
- return value;
- }
-
- public long getTs() {
- return ts;
- }
- }
-
- public static class Counter extends AbstractColumn {
- long incr = 0;
- Counter(byte[] family, byte[] qualifier, long incr){
- super(family, qualifier);
- this.incr = incr;
- }
-
- public long getIncrement() {
- return incr;
- }
- }
-
-
- private ArrayList<Column> columns;
- private ArrayList<Counter> counters;
-
-
- private ArrayList<Column> columns(){
- if(this.columns == null){
- this.columns = new ArrayList<Column>();
- }
- return this.columns;
- }
-
- private ArrayList<Counter> counters(){
- if(this.counters == null){
- this.counters = new ArrayList<Counter>();
- }
- return this.counters;
- }
-
- /**
- * Add a standard HBase column.
- *
- * @param family
- * @param qualifier
- * @param ts
- * @param value
- * @return
- */
- public ColumnList addColumn(byte[] family, byte[] qualifier, long ts, byte[] value){
- columns().add(new Column(family, qualifier, ts, value));
- return this;
- }
-
- /**
- * Add a standard HBase column
- * @param family
- * @param qualifier
- * @param value
- * @return
- */
- public ColumnList addColumn(byte[] family, byte[] qualifier, byte[] value){
- columns().add(new Column(family, qualifier, -1, value));
- return this;
- }
-
- /**
- * Add a standard HBase column given an instance of a class that implements
- * the <code>IColumn</code> interface.
- * @param column
- * @return
- */
- public ColumnList addColumn(IColumn column){
- return this.addColumn(column.family(), column.qualifier(), column.timestamp(), column.value());
- }
-
- /**
- * Add an HBase counter column.
- *
- * @param family
- * @param qualifier
- * @param incr
- * @return
- */
- public ColumnList addCounter(byte[] family, byte[] qualifier, long incr){
- counters().add(new Counter(family, qualifier, incr));
- return this;
- }
-
- /**
- * Add an HBase counter column given an instance of a class that implements the
- * <code>ICounter</code> interface.
- * @param counter
- * @return
- */
- public ColumnList addCounter(ICounter counter){
- return this.addCounter(counter.family(), counter.qualifier(), counter.increment());
- }
-
-
- /**
- * Query to determine if we have column definitions.
- *
- * @return
- */
- public boolean hasColumns(){
- return this.columns != null;
- }
-
- /**
- * Query to determine if we have counter definitions.
- *
- * @return
- */
- public boolean hasCounters(){
- return this.counters != null;
- }
-
- /**
- * Get the list of column definitions.
- *
- * @return
- */
- public List<Column> getColumns(){
- return this.columns;
- }
-
- /**
- * Get the list of counter definitions.
- * @return
- */
- public List<Counter> getCounters(){
- return this.counters;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d13d1680/_site/target/checkout/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/HBaseClient.java
----------------------------------------------------------------------
diff --git a/_site/target/checkout/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/HBaseClient.java b/_site/target/checkout/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/HBaseClient.java
deleted file mode 100644
index 94b5d51..0000000
--- a/_site/target/checkout/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/HBaseClient.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.hbase.common;
-
-import com.google.common.collect.Lists;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.*;
-import org.apache.hadoop.hbase.security.UserProvider;
-import org.apache.storm.hbase.bolt.mapper.HBaseProjectionCriteria;
-import org.apache.storm.hbase.security.HBaseSecurityUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
-import java.util.List;
-import java.util.Map;
-
-public class HBaseClient {
- private static final Logger LOG = LoggerFactory.getLogger(HBaseClient.class);
-
- private HTable table;
-
- public HBaseClient(Map<String, Object> map , final Configuration configuration, final String tableName) {
- try {
- UserProvider provider = HBaseSecurityUtil.login(map, configuration);
- this.table = provider.getCurrent().getUGI().doAs(new PrivilegedExceptionAction<HTable>() {
- @Override
- public HTable run() throws IOException {
- return new HTable(configuration, tableName);
- }
- });
- } catch(Exception e) {
- throw new RuntimeException("HBase bolt preparation failed: " + e.getMessage(), e);
- }
- }
-
- public List<Mutation> constructMutationReq(byte[] rowKey, ColumnList cols, Durability durability) {
- List<Mutation> mutations = Lists.newArrayList();
-
- if (cols.hasColumns()) {
- Put put = new Put(rowKey);
- put.setDurability(durability);
- for (ColumnList.Column col : cols.getColumns()) {
- if (col.getTs() > 0) {
- put.add(
- col.getFamily(),
- col.getQualifier(),
- col.getTs(),
- col.getValue()
- );
- } else {
- put.add(
- col.getFamily(),
- col.getQualifier(),
- col.getValue()
- );
- }
- }
- mutations.add(put);
- }
-
- if (cols.hasCounters()) {
- Increment inc = new Increment(rowKey);
- inc.setDurability(durability);
- for (ColumnList.Counter cnt : cols.getCounters()) {
- inc.addColumn(
- cnt.getFamily(),
- cnt.getQualifier(),
- cnt.getIncrement()
- );
- }
- mutations.add(inc);
- }
-
- if (mutations.isEmpty()) {
- mutations.add(new Put(rowKey));
- }
- return mutations;
- }
-
- public void batchMutate(List<Mutation> mutations) throws Exception {
- Object[] result = new Object[mutations.size()];
- try {
- table.batch(mutations, result);
- } catch (InterruptedException e) {
- LOG.warn("Error performing a mutation to HBase.", e);
- throw e;
- } catch (IOException e) {
- LOG.warn("Error performing a mutation to HBase.", e);
- throw e;
- }
- }
-
-
- public Get constructGetRequests(byte[] rowKey, HBaseProjectionCriteria projectionCriteria) {
- Get get = new Get(rowKey);
-
- if (projectionCriteria != null) {
- for (byte[] columnFamily : projectionCriteria.getColumnFamilies()) {
- get.addFamily(columnFamily);
- }
-
- for (HBaseProjectionCriteria.ColumnMetaData columnMetaData : projectionCriteria.getColumns()) {
- get.addColumn(columnMetaData.getColumnFamily(), columnMetaData.getQualifier());
- }
- }
-
- return get;
- }
-
- public Result[] batchGet(List<Get> gets) throws Exception {
- try {
- return table.get(gets);
- } catch (Exception e) {
- LOG.warn("Could not perform HBASE lookup.", e);
- throw e;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d13d1680/_site/target/checkout/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/IColumn.java
----------------------------------------------------------------------
diff --git a/_site/target/checkout/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/IColumn.java b/_site/target/checkout/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/IColumn.java
deleted file mode 100644
index 36f7e96..0000000
--- a/_site/target/checkout/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/IColumn.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.hbase.common;
-
-/**
- * Interface definition for classes that support being written to HBase as
- * a regular column.
- *
- */
-public interface IColumn {
- byte[] family();
- byte[] qualifier();
- byte[] value();
- long timestamp();
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d13d1680/_site/target/checkout/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/ICounter.java
----------------------------------------------------------------------
diff --git a/_site/target/checkout/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/ICounter.java b/_site/target/checkout/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/ICounter.java
deleted file mode 100644
index 43e3f60..0000000
--- a/_site/target/checkout/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/ICounter.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.hbase.common;
-
-/**
- * Interface definition for classes that support being written to HBase as
- * a counter column.
- *
- */
-public interface ICounter {
- byte[] family();
- byte[] qualifier();
- long increment();
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d13d1680/_site/target/checkout/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/Utils.java
----------------------------------------------------------------------
diff --git a/_site/target/checkout/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/Utils.java b/_site/target/checkout/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/Utils.java
deleted file mode 100644
index 52b86c9..0000000
--- a/_site/target/checkout/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/Utils.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.hbase.common;
-
-import org.apache.hadoop.hbase.util.Bytes;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.math.BigDecimal;
-
-public class Utils {
- private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
-
- private Utils(){}
-
- public static long toLong(Object obj){
- long l = 0;
- if(obj != null){
- if(obj instanceof Number){
- l = ((Number)obj).longValue();
- } else{
- LOG.warn("Could not coerce {} to Long", obj.getClass().getName());
- }
- }
- return l;
- }
-
- public static byte[] toBytes(Object obj){
- if(obj instanceof String){
- return ((String)obj).getBytes();
- } else if (obj instanceof Integer){
- return Bytes.toBytes((Integer) obj);
- } else if (obj instanceof Long){
- return Bytes.toBytes((Long)obj);
- } else if (obj instanceof Short){
- return Bytes.toBytes((Short)obj);
-
- } else if (obj instanceof Float){
- return Bytes.toBytes((Float)obj);
-
- } else if (obj instanceof Double){
- return Bytes.toBytes((Double)obj);
-
- } else if (obj instanceof Boolean){
- return Bytes.toBytes((Boolean)obj);
-
- } else if (obj instanceof BigDecimal){
- return Bytes.toBytes((BigDecimal)obj);
- } else {
- LOG.error("Can't convert class to byte array: " + obj.getClass().getName());
- return new byte[0];
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d13d1680/_site/target/checkout/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/HBaseSecurityUtil.java
----------------------------------------------------------------------
diff --git a/_site/target/checkout/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/HBaseSecurityUtil.java b/_site/target/checkout/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/HBaseSecurityUtil.java
deleted file mode 100644
index bb53478..0000000
--- a/_site/target/checkout/external/storm-hbase/src/main/java/org/apache/storm/hbase/security/HBaseSecurityUtil.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.hbase.security;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.security.UserProvider;
-import org.apache.hadoop.security.UserGroupInformation;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.util.Map;
-
-/**
- * This class provides util methods for storm-hbase connector communicating
- * with secured HBase.
- */
-public class HBaseSecurityUtil {
- public static final String STORM_KEYTAB_FILE_KEY = "storm.keytab.file";
- public static final String STORM_USER_NAME_KEY = "storm.kerberos.principal";
-
- public static UserProvider login(Map conf, Configuration hbaseConfig) throws IOException {
- UserProvider provider = UserProvider.instantiate(hbaseConfig);
- if (UserGroupInformation.isSecurityEnabled()) {
- String keytab = (String) conf.get(STORM_KEYTAB_FILE_KEY);
- if (keytab != null) {
- hbaseConfig.set(STORM_KEYTAB_FILE_KEY, keytab);
- }
- String userName = (String) conf.get(STORM_USER_NAME_KEY);
- if (userName != null) {
- hbaseConfig.set(STORM_USER_NAME_KEY, userName);
- }
- provider.login(STORM_KEYTAB_FILE_KEY, STORM_USER_NAME_KEY,
- InetAddress.getLocalHost().getCanonicalHostName());
- }
- return provider;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d13d1680/_site/target/checkout/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/mapper/SimpleTridentHBaseMapper.java
----------------------------------------------------------------------
diff --git a/_site/target/checkout/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/mapper/SimpleTridentHBaseMapper.java b/_site/target/checkout/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/mapper/SimpleTridentHBaseMapper.java
deleted file mode 100644
index be3ab95..0000000
--- a/_site/target/checkout/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/mapper/SimpleTridentHBaseMapper.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.hbase.trident.mapper;
-
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import org.apache.storm.hbase.bolt.mapper.HBaseMapper;
-import org.apache.storm.hbase.common.ColumnList;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import storm.trident.tuple.TridentTuple;
-
-import static org.apache.storm.hbase.common.Utils.toBytes;
-import static org.apache.storm.hbase.common.Utils.toLong;
-
-/**
- *
- */
-public class SimpleTridentHBaseMapper implements TridentHBaseMapper {
- private static final Logger LOG = LoggerFactory.getLogger(SimpleTridentHBaseMapper.class);
-
- private String rowKeyField;
- private byte[] columnFamily;
- private Fields columnFields;
- private Fields counterFields;
-
- public SimpleTridentHBaseMapper(){
- }
-
-
- public SimpleTridentHBaseMapper withRowKeyField(String rowKeyField){
- this.rowKeyField = rowKeyField;
- return this;
- }
-
- public SimpleTridentHBaseMapper withColumnFields(Fields columnFields){
- this.columnFields = columnFields;
- return this;
- }
-
- public SimpleTridentHBaseMapper withCounterFields(Fields counterFields){
- this.counterFields = counterFields;
- return this;
- }
-
- public SimpleTridentHBaseMapper withColumnFamily(String columnFamily){
- this.columnFamily = columnFamily.getBytes();
- return this;
- }
-
-
- @Override
- public byte[] rowKey(TridentTuple tuple) {
- Object objVal = tuple.getValueByField(this.rowKeyField);
- return toBytes(objVal);
- }
-
- @Override
- public ColumnList columns(TridentTuple tuple) {
- ColumnList cols = new ColumnList();
- if(this.columnFields != null){
- // TODO timestamps
- for(String field : this.columnFields){
- cols.addColumn(this.columnFamily, field.getBytes(), toBytes(tuple.getValueByField(field)));
- }
- }
- if(this.counterFields != null){
- for(String field : this.counterFields){
- cols.addCounter(this.columnFamily, field.getBytes(), toLong(tuple.getValueByField(field)));
- }
- }
- return cols;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d13d1680/_site/target/checkout/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/mapper/TridentHBaseMapper.java
----------------------------------------------------------------------
diff --git a/_site/target/checkout/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/mapper/TridentHBaseMapper.java b/_site/target/checkout/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/mapper/TridentHBaseMapper.java
deleted file mode 100644
index 64d10d0..0000000
--- a/_site/target/checkout/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/mapper/TridentHBaseMapper.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.hbase.trident.mapper;
-
-
-import backtype.storm.tuple.Tuple;
-import org.apache.storm.hbase.common.ColumnList;
-import storm.trident.tuple.TridentTuple;
-
-import java.io.Serializable;
-/**
- * Maps a <code>storm.trident.tuple.TridentTuple</code> object
- * to a row in an HBase table.
- */
-public interface TridentHBaseMapper extends Serializable {
-
-
- /**
- * Given a tuple, return the HBase rowkey.
- *
- * @param tuple
- * @return
- */
- byte[] rowKey(TridentTuple tuple);
-
- /**
- * Given a tuple, return a list of HBase columns to insert.
- *
- * @param tuple
- * @return
- */
- ColumnList columns(TridentTuple tuple);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d13d1680/_site/target/checkout/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseMapState.java
----------------------------------------------------------------------
diff --git a/_site/target/checkout/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseMapState.java b/_site/target/checkout/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseMapState.java
deleted file mode 100644
index 742206b..0000000
--- a/_site/target/checkout/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseMapState.java
+++ /dev/null
@@ -1,250 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.hbase.trident.state;
-
-import backtype.storm.task.IMetricsContext;
-import backtype.storm.topology.FailedException;
-import backtype.storm.tuple.Values;
-import com.google.common.collect.Maps;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.*;
-import org.apache.hadoop.hbase.security.UserProvider;
-import org.apache.storm.hbase.security.HBaseSecurityUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import storm.trident.state.*;
-import storm.trident.state.map.*;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.io.Serializable;
-import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-
-public class HBaseMapState<T> implements IBackingMap<T> {
- private static Logger LOG = LoggerFactory.getLogger(HBaseMapState.class);
-
- private int partitionNum;
-
-
- @SuppressWarnings("rawtypes")
- private static final Map<StateType, Serializer> DEFAULT_SERIALZERS = Maps.newHashMap();
-
- static {
- DEFAULT_SERIALZERS.put(StateType.NON_TRANSACTIONAL, new JSONNonTransactionalSerializer());
- DEFAULT_SERIALZERS.put(StateType.TRANSACTIONAL, new JSONTransactionalSerializer());
- DEFAULT_SERIALZERS.put(StateType.OPAQUE, new JSONOpaqueSerializer());
- }
-
- private Options<T> options;
- private Serializer<T> serializer;
- private HTable table;
-
- public HBaseMapState(final Options<T> options, Map map, int partitionNum) {
- this.options = options;
- this.serializer = options.serializer;
- this.partitionNum = partitionNum;
-
- final Configuration hbConfig = HBaseConfiguration.create();
- Map<String, Object> conf = (Map<String, Object>)map.get(options.configKey);
- if(conf == null){
- LOG.info("HBase configuration not found using key '" + options.configKey + "'");
- LOG.info("Using HBase config from first hbase-site.xml found on classpath.");
- } else {
- if (conf.get("hbase.rootdir") == null) {
- LOG.warn("No 'hbase.rootdir' value found in configuration! Using HBase defaults.");
- }
- for (String key : conf.keySet()) {
- hbConfig.set(key, String.valueOf(map.get(key)));
- }
- }
-
- try{
- UserProvider provider = HBaseSecurityUtil.login(map, hbConfig);
- this.table = provider.getCurrent().getUGI().doAs(new PrivilegedExceptionAction<HTable>() {
- @Override
- public HTable run() throws IOException {
- return new HTable(hbConfig, options.tableName);
- }
- });
- } catch(Exception e){
- throw new RuntimeException("HBase bolt preparation failed: " + e.getMessage(), e);
- }
-
- }
-
-
- public static class Options<T> implements Serializable {
-
- public Serializer<T> serializer = null;
- public int cacheSize = 5000;
- public String globalKey = "$HBASE_STATE_GLOBAL$";
- public String configKey = "hbase.config";
- public String tableName;
- public String columnFamily;
- public String qualifier;
- }
-
-
- @SuppressWarnings("rawtypes")
- public static StateFactory opaque() {
- Options<OpaqueValue> options = new Options<OpaqueValue>();
- return opaque(options);
- }
-
- @SuppressWarnings("rawtypes")
- public static StateFactory opaque(Options<OpaqueValue> opts) {
-
- return new Factory(StateType.OPAQUE, opts);
- }
-
- @SuppressWarnings("rawtypes")
- public static StateFactory transactional() {
- Options<TransactionalValue> options = new Options<TransactionalValue>();
- return transactional(options);
- }
-
- @SuppressWarnings("rawtypes")
- public static StateFactory transactional(Options<TransactionalValue> opts) {
- return new Factory(StateType.TRANSACTIONAL, opts);
- }
-
- public static StateFactory nonTransactional() {
- Options<Object> options = new Options<Object>();
- return nonTransactional(options);
- }
-
- public static StateFactory nonTransactional(Options<Object> opts) {
- return new Factory(StateType.NON_TRANSACTIONAL, opts);
- }
-
-
- protected static class Factory implements StateFactory {
- private StateType stateType;
- private Options options;
-
- @SuppressWarnings({"rawtypes", "unchecked"})
- public Factory(StateType stateType, Options options) {
- this.stateType = stateType;
- this.options = options;
-
- if (this.options.serializer == null) {
- this.options.serializer = DEFAULT_SERIALZERS.get(stateType);
- }
-
- if (this.options.serializer == null) {
- throw new RuntimeException("Serializer should be specified for type: " + stateType);
- }
- }
-
- @SuppressWarnings({"rawtypes", "unchecked"})
- public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
- LOG.info("Preparing HBase State for partition {} of {}.", partitionIndex + 1, numPartitions);
- IBackingMap state = new HBaseMapState(options, conf, partitionIndex);
-
- if(options.cacheSize > 0) {
- state = new CachedMap(state, options.cacheSize);
- }
-
- MapState mapState;
- switch (stateType) {
- case NON_TRANSACTIONAL:
- mapState = NonTransactionalMap.build(state);
- break;
- case OPAQUE:
- mapState = OpaqueMap.build(state);
- break;
- case TRANSACTIONAL:
- mapState = TransactionalMap.build(state);
- break;
- default:
- throw new IllegalArgumentException("Unknown state type: " + stateType);
- }
- return new SnapshottableMap(mapState, new Values(options.globalKey));
- }
-
- }
-
- @Override
- public List<T> multiGet(List<List<Object>> keys) {
- List<Get> gets = new ArrayList<Get>();
- for(List<Object> key : keys){
- LOG.info("Partition: {}, GET: {}", this.partitionNum, key);
- Get get = new Get(toRowKey(key));
- get.addColumn(this.options.columnFamily.getBytes(), this.options.qualifier.getBytes());
- gets.add(get);
- }
-
- List<T> retval = new ArrayList<T>();
- try {
- Result[] results = this.table.get(gets);
- for (Result result : results) {
- byte[] value = result.getValue(this.options.columnFamily.getBytes(), this.options.qualifier.getBytes());
- if(value != null) {
- retval.add(this.serializer.deserialize(value));
- } else {
- retval.add(null);
- }
- }
- } catch(IOException e){
- throw new FailedException("IOException while reading from HBase.", e);
- }
- return retval;
- }
-
- @Override
- public void multiPut(List<List<Object>> keys, List<T> values) {
- List<Put> puts = new ArrayList<Put>(keys.size());
- for (int i = 0; i < keys.size(); i++) {
- LOG.info("Partiton: {}, Key: {}, Value: {}", new Object[]{this.partitionNum, keys.get(i), new String(this.serializer.serialize(values.get(i)))});
- Put put = new Put(toRowKey(keys.get(i)));
- T val = values.get(i);
- put.add(this.options.columnFamily.getBytes(),
- this.options.qualifier.getBytes(),
- this.serializer.serialize(val));
-
- puts.add(put);
- }
- try {
- this.table.put(puts);
- } catch (InterruptedIOException e) {
- throw new FailedException("Interrupted while writing to HBase", e);
- } catch (RetriesExhaustedWithDetailsException e) {
- throw new FailedException("Retries exhaused while writing to HBase", e);
- }
- }
-
-
- private byte[] toRowKey(List<Object> keys) {
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- try {
- for (Object key : keys) {
- bos.write(String.valueOf(key).getBytes());
- }
- bos.close();
- } catch (IOException e){
- throw new RuntimeException("IOException creating HBase row key.", e);
- }
- return bos.toByteArray();
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d13d1680/_site/target/checkout/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseQuery.java
----------------------------------------------------------------------
diff --git a/_site/target/checkout/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseQuery.java b/_site/target/checkout/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseQuery.java
deleted file mode 100644
index c7836ed..0000000
--- a/_site/target/checkout/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseQuery.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.hbase.trident.state;
-
-import backtype.storm.tuple.Values;
-import storm.trident.operation.TridentCollector;
-import storm.trident.state.BaseQueryFunction;
-import storm.trident.tuple.TridentTuple;
-
-import java.util.List;
-
-public class HBaseQuery extends BaseQueryFunction<HBaseState, List<Values>> {
-
- @Override
- public List<List<Values>> batchRetrieve(HBaseState hBaseState, List<TridentTuple> tridentTuples) {
- return hBaseState.batchRetrieve(tridentTuples);
- }
-
- @Override
- public void execute(TridentTuple tuples, List<Values> values, TridentCollector tridentCollector) {
- for (Values value : values) {
- tridentCollector.emit(value);
- }
- }
-}