You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by HeartSaVioR <gi...@git.apache.org> on 2017/02/27 12:10:31 UTC

[GitHub] storm pull request #1970: STORM-2383 Support HBase as state backend (1.x)

GitHub user HeartSaVioR opened a pull request:

    https://github.com/apache/storm/pull/1970

    STORM-2383 Support HBase as state backend (1.x)

    * Implement HBase state backend
      * picked Redis state backend implementation
      * Current implementation stores prepare/commit/txids into a column family in a table
        * prepare, commit, txids have its row
        * they're stored to qualifier:value map (not scalable but guarantee atomicity)
        * we can address atomicity vs scalable later
    * Added 'how to set up' to State-checkpointing doc
    
    NOTE: Currently I can't run or debug tests on IntelliJ for master branch (so worked on 1.x-branch). I also heard many folks are struggling, so might be better to address it soon.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/HeartSaVioR/storm STORM-2383-1.x

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/1970.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1970
    
----
commit 1ed4779ef42e203f1b3734a060f5aebb2d5b32cc
Author: Jungtaek Lim <ka...@gmail.com>
Date:   2017-02-27T12:01:13Z

    STORM-2383 Support HBase as state backend
    
    * Implement HBase state backend
      * picked Redis state backend implementation
      * Current implementation stores prepare/commit/txids into a column family in a table
        * prepare, commit, txids have its row
        * they're stored to qualifier:value map (not scalable but guarantee atomicity)
        * we can address atomicity vs scalable later
    * Added 'how to set up' to State-checkpointing doc

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1970: STORM-2383 Support HBase as state backend (1.x)

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/1970
  
    @arunmahadevan 
    Thanks for reviewing. I addressed your review comments.
    For iterator, let's handle it at once for Redis and HBase from STORM-2449 since it should be similar approach.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1970: STORM-2383 Support HBase as state backend (1.x)

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on the issue:

    https://github.com/apache/storm/pull/1970
  
    +1, LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1970: STORM-2383 Support HBase as state backend (1.x)

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1970#discussion_r109284284
  
    --- Diff: external/storm-hbase/src/main/java/org/apache/storm/hbase/state/HBaseKeyValueState.java ---
    @@ -0,0 +1,399 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.hbase.state;
    +
    +import org.apache.hadoop.hbase.client.Delete;
    +import org.apache.hadoop.hbase.client.Durability;
    +import org.apache.hadoop.hbase.client.Get;
    +import org.apache.hadoop.hbase.client.Mutation;
    +import org.apache.hadoop.hbase.client.Result;
    +import org.apache.storm.hbase.bolt.mapper.HBaseProjectionCriteria;
    +import org.apache.storm.hbase.common.ColumnList;
    +import org.apache.storm.hbase.common.HBaseClient;
    +import org.apache.storm.hbase.utils.HBaseEncoder;
    +import org.apache.storm.state.DefaultStateSerializer;
    +import org.apache.storm.state.KeyValueState;
    +import org.apache.storm.state.Serializer;
    +import org.apache.storm.utils.ByteArrayUtil;
    +import org.apache.storm.utils.ByteArrayWrapper;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.NavigableMap;
    +import java.util.concurrent.ConcurrentHashMap;
    +
    +/**
    + * A Hbase based implementation that persists the state in HBase.
    + */
    +public class HBaseKeyValueState<K, V> implements KeyValueState<K, V> {
    +    private static final Logger LOG = LoggerFactory.getLogger(HBaseKeyValueState.class);
    +
    +    private static final ByteArrayWrapper COMMIT_TXID_KEY = new ByteArrayWrapper("commit".getBytes());
    +    private static final ByteArrayWrapper PREPARE_TXID_KEY = new ByteArrayWrapper("prepare".getBytes());
    +
    +    private final byte[] namespace;
    +    private final byte[] prepareNamespace;
    +    private final byte[] txidNamespace;
    +    private final byte[] columnFamily;
    +    private final HBaseEncoder<K, V> encoder;
    +    private final HBaseClient hBaseClient;
    +
    +    private Map<ByteArrayWrapper, byte[]> pendingPrepare;
    +    private Map<ByteArrayWrapper, byte[]> pendingCommit;
    +
    +    // the key and value of txIds are guaranteed to be converted to UTF-8 encoded String
    +    private Map<ByteArrayWrapper, byte[]> txIds;
    +
    +    public HBaseKeyValueState(HBaseClient hbaseClient, String columnFamily, String namespace) {
    +        this(hbaseClient, columnFamily, namespace, new DefaultStateSerializer<K>(), new DefaultStateSerializer<V>());
    +    }
    +
    +    public HBaseKeyValueState(HBaseClient hBaseClient, String columnFamily, String namespace,
    +                              Serializer<K> keySerializer, Serializer<V> valueSerializer) {
    +
    +        this.hBaseClient = hBaseClient;
    +        this.columnFamily = columnFamily.getBytes();
    +        this.namespace = namespace.getBytes();
    +        this.prepareNamespace = (namespace + "$prepare").getBytes();
    +        this.txidNamespace = (namespace + "$txid").getBytes();
    +        this.encoder = new HBaseEncoder<K, V>(keySerializer, valueSerializer);
    +        this.pendingPrepare = createPendingPrepareMap();
    +        initTxids();
    +        initPendingCommit();
    +    }
    +
    +    private void initTxids() {
    +        HBaseProjectionCriteria criteria = new HBaseProjectionCriteria();
    +        criteria.addColumnFamily(columnFamily);
    +        Get get = hBaseClient.constructGetRequests(txidNamespace, criteria);
    +        try {
    +            Result[] results = hBaseClient.batchGet(Collections.singletonList(get));
    +            Result result = results[0];
    +            if (!result.isEmpty()) {
    +                NavigableMap<byte[], byte[]> familyMap = result.getFamilyMap(columnFamily);
    +                txIds = ByteArrayUtil.Maps.newHashMapWrappingKey(familyMap);
    +            } else {
    +                txIds = new HashMap<>();
    +            }
    +
    +            LOG.debug("initTxids, txIds {}", txIds);
    +        } catch (Exception e) {
    +            throw new RuntimeException(e);
    +        }
    +    }
    +
    +    private void initPendingCommit() {
    +        HBaseProjectionCriteria criteria = new HBaseProjectionCriteria();
    +        criteria.addColumnFamily(columnFamily);
    +        Get get = hBaseClient.constructGetRequests(prepareNamespace, criteria);
    +        try {
    +            Result[] results = hBaseClient.batchGet(Collections.singletonList(get));
    +            Result result = results[0];
    +            if (!result.isEmpty()) {
    +                LOG.debug("Loading previously prepared commit from {}", prepareNamespace);
    +                NavigableMap<byte[], byte[]> familyMap = result.getFamilyMap(columnFamily);
    +                pendingCommit = ByteArrayUtil.Maps.newHashMapWrappingKey(familyMap);
    +            } else {
    +                LOG.debug("No previously prepared commits.");
    +                pendingCommit = Collections.emptyMap();
    +            }
    +        } catch (Exception e) {
    +            throw new RuntimeException(e);
    +        }
    +    }
    +
    +    @Override
    +    public void put(K key, V value) {
    +        LOG.debug("put key '{}', value '{}'", key, value);
    +        byte[] columnKey = encoder.encodeKey(key);
    +        byte[] columnValue = encoder.encodeValue(value);
    +        pendingPrepare.put(new ByteArrayWrapper(columnKey), columnValue);
    +    }
    +
    +    @Override
    +    public V get(K key) {
    +        LOG.debug("get key '{}'", key);
    +        byte[] columnKey = encoder.encodeKey(key);
    +        byte[] columnValue = null;
    +
    +        ByteArrayWrapper wrappedKey = new ByteArrayWrapper(columnKey);
    +        if (pendingPrepare.containsKey(wrappedKey)) {
    +            columnValue = pendingPrepare.get(wrappedKey);
    +        } else if (pendingCommit.containsKey(wrappedKey)) {
    +            columnValue = pendingCommit.get(wrappedKey);
    +        } else {
    +            HBaseProjectionCriteria criteria = new HBaseProjectionCriteria();
    +            HBaseProjectionCriteria.ColumnMetaData column = new HBaseProjectionCriteria.ColumnMetaData(columnFamily, columnKey);
    +            criteria.addColumn(column);
    +            Get get = hBaseClient.constructGetRequests(namespace, criteria);
    +            try {
    +                Result[] results = hBaseClient.batchGet(Collections.singletonList(get));
    +                Result result = results[0];
    +                columnValue = result.getValue(column.getColumnFamily(), column.getQualifier());
    +            } catch (Exception e) {
    +                throw new RuntimeException(e);
    +            }
    +        }
    +        V value = null;
    +        if (columnValue != null) {
    +            value = encoder.decodeValue(columnValue);
    +        }
    +        LOG.debug("Value for key '{}' is '{}'", key, value);
    +        return value;
    +    }
    +
    +    @Override
    +    public V get(K key, V defaultValue) {
    +        V val = get(key);
    +        return val != null ? val : defaultValue;
    +    }
    +
    +    @Override
    +    public V delete(K key) {
    +        LOG.debug("delete key '{}'", key);
    +        byte[] columnKey = encoder.encodeKey(key);
    +        V curr = get(key);
    +        pendingPrepare.put(new ByteArrayWrapper(columnKey), HBaseEncoder.TOMBSTONE);
    +        return curr;
    +    }
    +
    +    @Override
    +    public Iterator<Map.Entry<K, V>> iterator() {
    +        // TODO: is it possible to implement this correctly?
    --- End diff --
    
    OK. Filed [STORM-2449](https://issues.apache.org/jira/browse/STORM-2449).
    I'll apply the approach to this PR, too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1970: STORM-2383 Support HBase as state backend (1.x)

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1970#discussion_r108139910
  
    --- Diff: external/storm-hbase/src/main/java/org/apache/storm/hbase/state/HBaseKeyValueState.java ---
    @@ -0,0 +1,399 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.hbase.state;
    +
    +import org.apache.hadoop.hbase.client.Delete;
    +import org.apache.hadoop.hbase.client.Durability;
    +import org.apache.hadoop.hbase.client.Get;
    +import org.apache.hadoop.hbase.client.Mutation;
    +import org.apache.hadoop.hbase.client.Result;
    +import org.apache.storm.hbase.bolt.mapper.HBaseProjectionCriteria;
    +import org.apache.storm.hbase.common.ColumnList;
    +import org.apache.storm.hbase.common.HBaseClient;
    +import org.apache.storm.hbase.utils.HBaseEncoder;
    +import org.apache.storm.state.DefaultStateSerializer;
    +import org.apache.storm.state.KeyValueState;
    +import org.apache.storm.state.Serializer;
    +import org.apache.storm.utils.ByteArrayUtil;
    +import org.apache.storm.utils.ByteArrayWrapper;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.NavigableMap;
    +import java.util.concurrent.ConcurrentHashMap;
    +
    +/**
    + * A Hbase based implementation that persists the state in HBase.
    + */
    +public class HBaseKeyValueState<K, V> implements KeyValueState<K, V> {
    +    private static final Logger LOG = LoggerFactory.getLogger(HBaseKeyValueState.class);
    +
    +    private static final ByteArrayWrapper COMMIT_TXID_KEY = new ByteArrayWrapper("commit".getBytes());
    +    private static final ByteArrayWrapper PREPARE_TXID_KEY = new ByteArrayWrapper("prepare".getBytes());
    +
    +    private final byte[] namespace;
    +    private final byte[] prepareNamespace;
    +    private final byte[] txidNamespace;
    +    private final byte[] columnFamily;
    +    private final HBaseEncoder<K, V> encoder;
    +    private final HBaseClient hBaseClient;
    +
    +    private Map<ByteArrayWrapper, byte[]> pendingPrepare;
    +    private Map<ByteArrayWrapper, byte[]> pendingCommit;
    +
    +    // the key and value of txIds are guaranteed to be converted to UTF-8 encoded String
    +    private Map<ByteArrayWrapper, byte[]> txIds;
    +
    +    public HBaseKeyValueState(HBaseClient hbaseClient, String columnFamily, String namespace) {
    +        this(hbaseClient, columnFamily, namespace, new DefaultStateSerializer<K>(), new DefaultStateSerializer<V>());
    +    }
    +
    +    public HBaseKeyValueState(HBaseClient hBaseClient, String columnFamily, String namespace,
    +                              Serializer<K> keySerializer, Serializer<V> valueSerializer) {
    +
    +        this.hBaseClient = hBaseClient;
    +        this.columnFamily = columnFamily.getBytes();
    +        this.namespace = namespace.getBytes();
    +        this.prepareNamespace = (namespace + "$prepare").getBytes();
    +        this.txidNamespace = (namespace + "$txid").getBytes();
    +        this.encoder = new HBaseEncoder<K, V>(keySerializer, valueSerializer);
    +        this.pendingPrepare = createPendingPrepareMap();
    +        initTxids();
    +        initPendingCommit();
    +    }
    +
    +    private void initTxids() {
    +        HBaseProjectionCriteria criteria = new HBaseProjectionCriteria();
    +        criteria.addColumnFamily(columnFamily);
    +        Get get = hBaseClient.constructGetRequests(txidNamespace, criteria);
    +        try {
    +            Result[] results = hBaseClient.batchGet(Collections.singletonList(get));
    +            Result result = results[0];
    +            if (!result.isEmpty()) {
    +                NavigableMap<byte[], byte[]> familyMap = result.getFamilyMap(columnFamily);
    +                txIds = ByteArrayUtil.Maps.newHashMapWrappingKey(familyMap);
    +            } else {
    +                txIds = new HashMap<>();
    +            }
    +
    +            LOG.debug("initTxids, txIds {}", txIds);
    +        } catch (Exception e) {
    +            throw new RuntimeException(e);
    +        }
    +    }
    +
    +    private void initPendingCommit() {
    +        HBaseProjectionCriteria criteria = new HBaseProjectionCriteria();
    +        criteria.addColumnFamily(columnFamily);
    +        Get get = hBaseClient.constructGetRequests(prepareNamespace, criteria);
    +        try {
    +            Result[] results = hBaseClient.batchGet(Collections.singletonList(get));
    +            Result result = results[0];
    +            if (!result.isEmpty()) {
    +                LOG.debug("Loading previously prepared commit from {}", prepareNamespace);
    +                NavigableMap<byte[], byte[]> familyMap = result.getFamilyMap(columnFamily);
    +                pendingCommit = ByteArrayUtil.Maps.newHashMapWrappingKey(familyMap);
    +            } else {
    +                LOG.debug("No previously prepared commits.");
    +                pendingCommit = Collections.emptyMap();
    +            }
    +        } catch (Exception e) {
    +            throw new RuntimeException(e);
    +        }
    +    }
    +
    +    @Override
    +    public void put(K key, V value) {
    +        LOG.debug("put key '{}', value '{}'", key, value);
    +        byte[] columnKey = encoder.encodeKey(key);
    +        byte[] columnValue = encoder.encodeValue(value);
    +        pendingPrepare.put(new ByteArrayWrapper(columnKey), columnValue);
    +    }
    +
    +    @Override
    +    public V get(K key) {
    +        LOG.debug("get key '{}'", key);
    +        byte[] columnKey = encoder.encodeKey(key);
    +        byte[] columnValue = null;
    +
    +        ByteArrayWrapper wrappedKey = new ByteArrayWrapper(columnKey);
    +        if (pendingPrepare.containsKey(wrappedKey)) {
    +            columnValue = pendingPrepare.get(wrappedKey);
    +        } else if (pendingCommit.containsKey(wrappedKey)) {
    +            columnValue = pendingCommit.get(wrappedKey);
    +        } else {
    +            HBaseProjectionCriteria criteria = new HBaseProjectionCriteria();
    +            HBaseProjectionCriteria.ColumnMetaData column = new HBaseProjectionCriteria.ColumnMetaData(columnFamily, columnKey);
    +            criteria.addColumn(column);
    +            Get get = hBaseClient.constructGetRequests(namespace, criteria);
    +            try {
    +                Result[] results = hBaseClient.batchGet(Collections.singletonList(get));
    +                Result result = results[0];
    +                columnValue = result.getValue(column.getColumnFamily(), column.getQualifier());
    +            } catch (Exception e) {
    +                throw new RuntimeException(e);
    +            }
    +        }
    +        V value = null;
    +        if (columnValue != null) {
    +            value = encoder.decodeValue(columnValue);
    +        }
    +        LOG.debug("Value for key '{}' is '{}'", key, value);
    +        return value;
    +    }
    +
    +    @Override
    +    public V get(K key, V defaultValue) {
    +        V val = get(key);
    +        return val != null ? val : defaultValue;
    +    }
    +
    +    @Override
    +    public V delete(K key) {
    +        LOG.debug("delete key '{}'", key);
    +        byte[] columnKey = encoder.encodeKey(key);
    +        V curr = get(key);
    +        pendingPrepare.put(new ByteArrayWrapper(columnKey), HBaseEncoder.TOMBSTONE);
    +        return curr;
    +    }
    +
    +    @Override
    +    public Iterator<Map.Entry<K, V>> iterator() {
    +        // TODO: is it possible to implement this correctly?
    --- End diff --
    
    Once the key-values are stored as rows with namespace as the prefix, we could do a scan with the prefix and return the iterator. 
    The rowkey can something like `boltname-taskid$keys:key` and then scan all the rows with `boltname-taskid$keys` as the prefix. For `get` query the specific key `boltname-taskid$keys:key1`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1970: STORM-2383 Support HBase as state backend (1.x)

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1970#discussion_r125626240
  
    --- Diff: external/storm-hbase/src/main/java/org/apache/storm/hbase/state/HBaseKeyValueState.java ---
    @@ -0,0 +1,419 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.hbase.state;
    +
    +import com.google.common.collect.Maps;
    +import com.google.common.primitives.UnsignedBytes;
    +import org.apache.hadoop.hbase.client.Delete;
    +import org.apache.hadoop.hbase.client.Durability;
    +import org.apache.hadoop.hbase.client.Get;
    +import org.apache.hadoop.hbase.client.Mutation;
    +import org.apache.hadoop.hbase.client.Result;
    +import org.apache.storm.hbase.bolt.mapper.HBaseProjectionCriteria;
    +import org.apache.storm.hbase.common.ColumnList;
    +import org.apache.storm.hbase.common.HBaseClient;
    +import org.apache.storm.state.DefaultStateEncoder;
    +import org.apache.storm.state.DefaultStateSerializer;
    +import org.apache.storm.state.KeyValueState;
    +import org.apache.storm.state.Serializer;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.NavigableMap;
    +import java.util.TreeMap;
    +import java.util.concurrent.ConcurrentNavigableMap;
    +import java.util.concurrent.ConcurrentSkipListMap;
    +
    +/**
    + * A Hbase based implementation that persists the state in HBase.
    + */
    +public class HBaseKeyValueState<K, V> implements KeyValueState<K, V> {
    +    private static final Logger LOG = LoggerFactory.getLogger(HBaseKeyValueState.class);
    +
    +    public static byte[] STATE_QUALIFIER = "s".getBytes();
    +
    +    public static final NavigableMap<byte[], byte[]> EMPTY_PENDING_COMMIT_MAP = Maps.unmodifiableNavigableMap(
    +            new TreeMap<byte[], byte[]>(UnsignedBytes.lexicographicalComparator()));
    +
    +    private static byte[] COMMIT_TXID_KEY = "commit".getBytes();
    +    private static byte[] PREPARE_TXID_KEY = "prepare".getBytes();
    +
    +    private final byte[] keyNamespace;
    +    private final byte[] prepareNamespace;
    +    private final byte[] txidNamespace;
    +    private final String namespace;
    +    private final byte[] columnFamily;
    +    private final DefaultStateEncoder<K, V> encoder;
    +    private final HBaseClient hBaseClient;
    +
    +    private ConcurrentNavigableMap<byte[], byte[]> pendingPrepare;
    +    private NavigableMap<byte[], byte[]> pendingCommit;
    +
    +    // the key and value of txIds are guaranteed to be converted to UTF-8 encoded String
    +    private NavigableMap<byte[], byte[]> txIds;
    +
    +    public HBaseKeyValueState(HBaseClient hbaseClient, String columnFamily, String namespace) {
    +        this(hbaseClient, columnFamily, namespace, new DefaultStateSerializer<K>(),
    +                new DefaultStateSerializer<V>());
    +    }
    +
    +    public HBaseKeyValueState(HBaseClient hBaseClient, String columnFamily, String namespace,
    +                              Serializer<K> keySerializer, Serializer<V> valueSerializer) {
    +
    +        this.hBaseClient = hBaseClient;
    +        this.columnFamily = columnFamily.getBytes();
    +        this.namespace = namespace;
    +        this.keyNamespace = (namespace + "$key:").getBytes();
    +        this.prepareNamespace = (namespace + "$prepare").getBytes();
    +        this.txidNamespace = (namespace + "$txid").getBytes();
    +        this.encoder = new DefaultStateEncoder<K, V>(keySerializer, valueSerializer);
    +        this.pendingPrepare = createPendingPrepareMap();
    +        initTxids();
    +        initPendingCommit();
    +    }
    +
    +    private void initTxids() {
    +        HBaseProjectionCriteria criteria = new HBaseProjectionCriteria();
    +        criteria.addColumnFamily(columnFamily);
    +        Get get = hBaseClient.constructGetRequests(txidNamespace, criteria);
    +        try {
    +            Result[] results = hBaseClient.batchGet(Collections.singletonList(get));
    +            Result result = results[0];
    +            if (!result.isEmpty()) {
    +                NavigableMap<byte[], byte[]> familyMap = result.getFamilyMap(columnFamily);
    +                txIds = new TreeMap<>(familyMap);
    +            } else {
    +                txIds = new TreeMap<>(UnsignedBytes.lexicographicalComparator());
    +            }
    +
    +            LOG.debug("initTxids, txIds {}", txIds);
    +        } catch (Exception e) {
    +            throw new RuntimeException(e);
    +        }
    +    }
    +
    +    private void initPendingCommit() {
    +        HBaseProjectionCriteria criteria = new HBaseProjectionCriteria();
    +        criteria.addColumnFamily(columnFamily);
    +        Get get = hBaseClient.constructGetRequests(prepareNamespace, criteria);
    +        try {
    +            Result[] results = hBaseClient.batchGet(Collections.singletonList(get));
    +            Result result = results[0];
    +            if (!result.isEmpty()) {
    +                LOG.debug("Loading previously prepared commit from {}", prepareNamespace);
    +                NavigableMap<byte[], byte[]> familyMap = result.getFamilyMap(columnFamily);
    +                pendingCommit = Maps.unmodifiableNavigableMap(familyMap);
    +            } else {
    +                LOG.debug("No previously prepared commits.");
    +                pendingCommit = EMPTY_PENDING_COMMIT_MAP;
    +            }
    +        } catch (Exception e) {
    +            throw new RuntimeException(e);
    +        }
    +    }
    +
    +    @Override
    +    public void put(K key, V value) {
    +        LOG.debug("put key '{}', value '{}'", key, value);
    +        byte[] columnKey = encoder.encodeKey(key);
    +        byte[] columnValue = encoder.encodeValue(value);
    +        pendingPrepare.put(columnKey, columnValue);
    +    }
    +
    +    @Override
    +    public V get(K key) {
    +        LOG.debug("get key '{}'", key);
    +        byte[] columnKey = encoder.encodeKey(key);
    +        byte[] columnValue = null;
    +
    +        if (pendingPrepare.containsKey(columnKey)) {
    +            columnValue = pendingPrepare.get(columnKey);
    +        } else if (pendingCommit.containsKey(columnKey)) {
    +            columnValue = pendingCommit.get(columnKey);
    +        } else {
    +            HBaseProjectionCriteria criteria = new HBaseProjectionCriteria();
    +            HBaseProjectionCriteria.ColumnMetaData column = new HBaseProjectionCriteria.ColumnMetaData(columnFamily,
    +                    STATE_QUALIFIER);
    +            criteria.addColumn(column);
    +            Get get = hBaseClient.constructGetRequests(getRowKeyForStateKey(columnKey), criteria);
    +            try {
    +                Result[] results = hBaseClient.batchGet(Collections.singletonList(get));
    +                Result result = results[0];
    +                columnValue = result.getValue(column.getColumnFamily(), column.getQualifier());
    +            } catch (Exception e) {
    +                throw new RuntimeException(e);
    +            }
    +        }
    +        V value = null;
    +        if (columnValue != null) {
    +            value = encoder.decodeValue(columnValue);
    +        }
    +        LOG.debug("Value for key '{}' is '{}'", key, value);
    +        return value;
    +    }
    +
    +    @Override
    +    public V get(K key, V defaultValue) {
    +        V val = get(key);
    +        return val != null ? val : defaultValue;
    +    }
    +
    +    @Override
    +    public V delete(K key) {
    +        LOG.debug("delete key '{}'", key);
    +        byte[] columnKey = encoder.encodeKey(key);
    +        V curr = get(key);
    +        pendingPrepare.put(columnKey, encoder.getTombstoneValue());
    +        return curr;
    +    }
    +
    +    @Override
    +    public Iterator<Map.Entry<K, V>> iterator() {
    +        throw new UnsupportedOperationException("Not supported yet.");
    --- End diff --
    
    Ah I missed to replace the line with HBaseKeyValueStateIterator. Will change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1970: STORM-2383 Support HBase as state backend (1.x)

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1970#discussion_r125626467
  
    --- Diff: external/storm-hbase/src/main/java/org/apache/storm/hbase/state/HBaseKeyValueStateIterator.java ---
    @@ -0,0 +1,153 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.hbase.state;
    +
    +import java.util.Arrays;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.TreeMap;
    +
    +import com.google.common.primitives.UnsignedBytes;
    +import org.apache.hadoop.hbase.client.Result;
    +import org.apache.hadoop.hbase.client.ResultScanner;
    +import org.apache.storm.hbase.common.HBaseClient;
    +import org.apache.storm.state.BaseBinaryStateIterator;
    +import org.apache.storm.state.DefaultStateEncoder;
    +import org.apache.storm.state.Serializer;
    +
    +import org.apache.storm.state.StateEncoder;
    +
    +import static org.apache.storm.hbase.state.HBaseKeyValueState.STATE_QUALIFIER;
    +
    +/**
    + * An iterator over {@link HBaseKeyValueState}.
    + */
    +public class HBaseKeyValueStateIterator<K, V> extends BaseBinaryStateIterator<K, V> {
    +
    +    private final byte[] keyNamespace;
    +    private byte[] cursorKey;
    +    private final byte[] endScanKey;
    +    private final byte[] columnFamily;
    +    private final HBaseClient hBaseClient;
    +    private final int chunkSize;
    +    private final StateEncoder<K, V, byte[], byte[]> encoder;
    +
    +    private Iterator<Map.Entry<byte[], byte[]>> cachedResultIterator;
    +
    +    /**
    +     * Constructor.
    +     *
    +     * @param namespace The namespace of State
    +     * @param columnFamily The column family of state
    +     * @param hBaseClient The instance of HBaseClient
    +     * @param pendingPrepareIterator The iterator of pendingPrepare
    +     * @param pendingCommitIterator The iterator of pendingCommit
    +     * @param chunkSize The size of chunk to get entries from HBase
    +     * @param keySerializer The serializer of key
    +     * @param valueSerializer The serializer of value
    +     */
    +    public HBaseKeyValueStateIterator(String namespace, byte[] columnFamily, HBaseClient hBaseClient,
    +                                      Iterator<Map.Entry<byte[], byte[]>> pendingPrepareIterator,
    +                                      Iterator<Map.Entry<byte[], byte[]>> pendingCommitIterator,
    +                                      int chunkSize, Serializer<K> keySerializer,
    +                                      Serializer<V> valueSerializer) {
    +        super(pendingPrepareIterator, pendingCommitIterator);
    +        this.columnFamily = columnFamily;
    +        this.keyNamespace = (namespace + "$key:").getBytes();
    +        this.cursorKey = (namespace + "$key:").getBytes();
    +        this.endScanKey = advanceRow(this.cursorKey);
    --- End diff --
    
    That's an end key for whole scan. I'll put comment on this line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1970: STORM-2383 Support HBase as state backend (1.x)

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1970#discussion_r125626316
  
    --- Diff: external/storm-hbase/src/main/java/org/apache/storm/hbase/state/HBaseKeyValueStateIterator.java ---
    @@ -0,0 +1,153 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.hbase.state;
    +
    +import java.util.Arrays;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.TreeMap;
    +
    +import com.google.common.primitives.UnsignedBytes;
    +import org.apache.hadoop.hbase.client.Result;
    +import org.apache.hadoop.hbase.client.ResultScanner;
    +import org.apache.storm.hbase.common.HBaseClient;
    +import org.apache.storm.state.BaseBinaryStateIterator;
    +import org.apache.storm.state.DefaultStateEncoder;
    +import org.apache.storm.state.Serializer;
    +
    +import org.apache.storm.state.StateEncoder;
    +
    +import static org.apache.storm.hbase.state.HBaseKeyValueState.STATE_QUALIFIER;
    +
    +/**
    + * An iterator over {@link HBaseKeyValueState}.
    + */
    +public class HBaseKeyValueStateIterator<K, V> extends BaseBinaryStateIterator<K, V> {
    --- End diff --
    
    Yes I intended to do that, but forgot to replace it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1970: STORM-2383 Support HBase as state backend (1.x)

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/1970
  
    @arunmahadevan 
    Thanks for reviewing. I addressed review comments, and squashed and rebased.
    I'll also craft a PR for master branch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1970: STORM-2383 Support HBase as state backend (1.x)

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/1970
  
    Just rebased with latest 1.x-branch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1970: STORM-2383 Support HBase as state backend (1.x)

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/storm/pull/1970


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1970: STORM-2383 Support HBase as state backend (1.x)

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1970#discussion_r125624635
  
    --- Diff: external/storm-hbase/src/main/java/org/apache/storm/hbase/state/HBaseKeyValueStateIterator.java ---
    @@ -0,0 +1,153 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.hbase.state;
    +
    +import java.util.Arrays;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.TreeMap;
    +
    +import com.google.common.primitives.UnsignedBytes;
    +import org.apache.hadoop.hbase.client.Result;
    +import org.apache.hadoop.hbase.client.ResultScanner;
    +import org.apache.storm.hbase.common.HBaseClient;
    +import org.apache.storm.state.BaseBinaryStateIterator;
    +import org.apache.storm.state.DefaultStateEncoder;
    +import org.apache.storm.state.Serializer;
    +
    +import org.apache.storm.state.StateEncoder;
    +
    +import static org.apache.storm.hbase.state.HBaseKeyValueState.STATE_QUALIFIER;
    +
    +/**
    + * An iterator over {@link HBaseKeyValueState}.
    + */
    +public class HBaseKeyValueStateIterator<K, V> extends BaseBinaryStateIterator<K, V> {
    +
    +    private final byte[] keyNamespace;
    +    private byte[] cursorKey;
    +    private final byte[] endScanKey;
    +    private final byte[] columnFamily;
    +    private final HBaseClient hBaseClient;
    +    private final int chunkSize;
    +    private final StateEncoder<K, V, byte[], byte[]> encoder;
    +
    +    private Iterator<Map.Entry<byte[], byte[]>> cachedResultIterator;
    +
    +    /**
    +     * Constructor.
    +     *
    +     * @param namespace The namespace of State
    +     * @param columnFamily The column family of state
    +     * @param hBaseClient The instance of HBaseClient
    +     * @param pendingPrepareIterator The iterator of pendingPrepare
    +     * @param pendingCommitIterator The iterator of pendingCommit
    +     * @param chunkSize The size of chunk to get entries from HBase
    +     * @param keySerializer The serializer of key
    +     * @param valueSerializer The serializer of value
    +     */
    +    public HBaseKeyValueStateIterator(String namespace, byte[] columnFamily, HBaseClient hBaseClient,
    +                                      Iterator<Map.Entry<byte[], byte[]>> pendingPrepareIterator,
    +                                      Iterator<Map.Entry<byte[], byte[]>> pendingCommitIterator,
    +                                      int chunkSize, Serializer<K> keySerializer,
    +                                      Serializer<V> valueSerializer) {
    +        super(pendingPrepareIterator, pendingCommitIterator);
    +        this.columnFamily = columnFamily;
    +        this.keyNamespace = (namespace + "$key:").getBytes();
    +        this.cursorKey = (namespace + "$key:").getBytes();
    +        this.endScanKey = advanceRow(this.cursorKey);
    --- End diff --
    
    Is this the final end key or the end key for the first chunk ? Can you add some clarifying comment here? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1970: STORM-2383 Support HBase as state backend (1.x)

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/1970
  
    I chose this approach (all KVs to one cf in one row) because this is only guaranteed way for HBase to insert/delete atomically.
    
    If we're OK to apply the changeset (in commit phase) without atomicity, we can make state scalable via having each row. Changeset still need to be put with atomic manner so that we don't have partial changeset as `prepared changeset`. Normally changeset don't grow too much so it doesn't hurt much for scalability.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1970: STORM-2383 Support HBase as state backend (1.x)

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/1970
  
    I updated this PR to apply the last change of STORM-2369. I'll simply rebase again after STORM-2369 is merged. And will also craft the PR for master branch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1970: STORM-2383 Support HBase as state backend (1.x)

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1970#discussion_r109118403
  
    --- Diff: external/storm-hbase/src/main/java/org/apache/storm/hbase/state/HBaseKeyValueState.java ---
    @@ -0,0 +1,399 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.hbase.state;
    +
    +import org.apache.hadoop.hbase.client.Delete;
    +import org.apache.hadoop.hbase.client.Durability;
    +import org.apache.hadoop.hbase.client.Get;
    +import org.apache.hadoop.hbase.client.Mutation;
    +import org.apache.hadoop.hbase.client.Result;
    +import org.apache.storm.hbase.bolt.mapper.HBaseProjectionCriteria;
    +import org.apache.storm.hbase.common.ColumnList;
    +import org.apache.storm.hbase.common.HBaseClient;
    +import org.apache.storm.hbase.utils.HBaseEncoder;
    +import org.apache.storm.state.DefaultStateSerializer;
    +import org.apache.storm.state.KeyValueState;
    +import org.apache.storm.state.Serializer;
    +import org.apache.storm.utils.ByteArrayUtil;
    +import org.apache.storm.utils.ByteArrayWrapper;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.NavigableMap;
    +import java.util.concurrent.ConcurrentHashMap;
    +
    +/**
    + * A Hbase based implementation that persists the state in HBase.
    + */
    +public class HBaseKeyValueState<K, V> implements KeyValueState<K, V> {
    +    private static final Logger LOG = LoggerFactory.getLogger(HBaseKeyValueState.class);
    +
    +    private static final ByteArrayWrapper COMMIT_TXID_KEY = new ByteArrayWrapper("commit".getBytes());
    +    private static final ByteArrayWrapper PREPARE_TXID_KEY = new ByteArrayWrapper("prepare".getBytes());
    +
    +    private final byte[] namespace;
    +    private final byte[] prepareNamespace;
    +    private final byte[] txidNamespace;
    +    private final byte[] columnFamily;
    +    private final HBaseEncoder<K, V> encoder;
    +    private final HBaseClient hBaseClient;
    +
    +    private Map<ByteArrayWrapper, byte[]> pendingPrepare;
    +    private Map<ByteArrayWrapper, byte[]> pendingCommit;
    +
    +    // the key and value of txIds are guaranteed to be converted to UTF-8 encoded String
    +    private Map<ByteArrayWrapper, byte[]> txIds;
    +
    +    public HBaseKeyValueState(HBaseClient hbaseClient, String columnFamily, String namespace) {
    +        this(hbaseClient, columnFamily, namespace, new DefaultStateSerializer<K>(), new DefaultStateSerializer<V>());
    +    }
    +
    +    public HBaseKeyValueState(HBaseClient hBaseClient, String columnFamily, String namespace,
    +                              Serializer<K> keySerializer, Serializer<V> valueSerializer) {
    +
    +        this.hBaseClient = hBaseClient;
    +        this.columnFamily = columnFamily.getBytes();
    +        this.namespace = namespace.getBytes();
    +        this.prepareNamespace = (namespace + "$prepare").getBytes();
    +        this.txidNamespace = (namespace + "$txid").getBytes();
    +        this.encoder = new HBaseEncoder<K, V>(keySerializer, valueSerializer);
    +        this.pendingPrepare = createPendingPrepareMap();
    +        initTxids();
    +        initPendingCommit();
    +    }
    +
    +    private void initTxids() {
    +        HBaseProjectionCriteria criteria = new HBaseProjectionCriteria();
    +        criteria.addColumnFamily(columnFamily);
    +        Get get = hBaseClient.constructGetRequests(txidNamespace, criteria);
    +        try {
    +            Result[] results = hBaseClient.batchGet(Collections.singletonList(get));
    +            Result result = results[0];
    +            if (!result.isEmpty()) {
    +                NavigableMap<byte[], byte[]> familyMap = result.getFamilyMap(columnFamily);
    +                txIds = ByteArrayUtil.Maps.newHashMapWrappingKey(familyMap);
    +            } else {
    +                txIds = new HashMap<>();
    +            }
    +
    +            LOG.debug("initTxids, txIds {}", txIds);
    +        } catch (Exception e) {
    +            throw new RuntimeException(e);
    +        }
    +    }
    +
    +    private void initPendingCommit() {
    +        HBaseProjectionCriteria criteria = new HBaseProjectionCriteria();
    +        criteria.addColumnFamily(columnFamily);
    +        Get get = hBaseClient.constructGetRequests(prepareNamespace, criteria);
    +        try {
    +            Result[] results = hBaseClient.batchGet(Collections.singletonList(get));
    +            Result result = results[0];
    +            if (!result.isEmpty()) {
    +                LOG.debug("Loading previously prepared commit from {}", prepareNamespace);
    +                NavigableMap<byte[], byte[]> familyMap = result.getFamilyMap(columnFamily);
    +                pendingCommit = ByteArrayUtil.Maps.newHashMapWrappingKey(familyMap);
    +            } else {
    +                LOG.debug("No previously prepared commits.");
    +                pendingCommit = Collections.emptyMap();
    +            }
    +        } catch (Exception e) {
    +            throw new RuntimeException(e);
    +        }
    +    }
    +
    +    @Override
    +    public void put(K key, V value) {
    +        LOG.debug("put key '{}', value '{}'", key, value);
    +        byte[] columnKey = encoder.encodeKey(key);
    +        byte[] columnValue = encoder.encodeValue(value);
    +        pendingPrepare.put(new ByteArrayWrapper(columnKey), columnValue);
    +    }
    +
    +    @Override
    +    public V get(K key) {
    +        LOG.debug("get key '{}'", key);
    +        byte[] columnKey = encoder.encodeKey(key);
    +        byte[] columnValue = null;
    +
    +        ByteArrayWrapper wrappedKey = new ByteArrayWrapper(columnKey);
    +        if (pendingPrepare.containsKey(wrappedKey)) {
    +            columnValue = pendingPrepare.get(wrappedKey);
    +        } else if (pendingCommit.containsKey(wrappedKey)) {
    +            columnValue = pendingCommit.get(wrappedKey);
    +        } else {
    +            HBaseProjectionCriteria criteria = new HBaseProjectionCriteria();
    +            HBaseProjectionCriteria.ColumnMetaData column = new HBaseProjectionCriteria.ColumnMetaData(columnFamily, columnKey);
    +            criteria.addColumn(column);
    +            Get get = hBaseClient.constructGetRequests(namespace, criteria);
    +            try {
    +                Result[] results = hBaseClient.batchGet(Collections.singletonList(get));
    +                Result result = results[0];
    +                columnValue = result.getValue(column.getColumnFamily(), column.getQualifier());
    +            } catch (Exception e) {
    +                throw new RuntimeException(e);
    +            }
    +        }
    +        V value = null;
    +        if (columnValue != null) {
    +            value = encoder.decodeValue(columnValue);
    +        }
    +        LOG.debug("Value for key '{}' is '{}'", key, value);
    +        return value;
    +    }
    +
    +    @Override
    +    public V get(K key, V defaultValue) {
    +        V val = get(key);
    +        return val != null ? val : defaultValue;
    +    }
    +
    +    @Override
    +    public V delete(K key) {
    +        LOG.debug("delete key '{}'", key);
    +        byte[] columnKey = encoder.encodeKey(key);
    +        V curr = get(key);
    +        pendingPrepare.put(new ByteArrayWrapper(columnKey), HBaseEncoder.TOMBSTONE);
    +        return curr;
    +    }
    +
    +    @Override
    +    public Iterator<Map.Entry<K, V>> iterator() {
    +        // TODO: is it possible to implement this correctly?
    --- End diff --
    
    @arunmahadevan 
    In Redis implementation, it can return same key multiple time, with different values, which is not what we expect from Iterator.
    
    Maybe we can make Iterator not returning KV pair if key is already provided before. If you think this approach fixes above issue, I'll apply this to Redis implementation, and try to migrate it to HBase state.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1970: STORM-2383 Support HBase as state backend (1.x)

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/1970
  
    @arunmahadevan Kindly request for review.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1970: STORM-2383 Support HBase as state backend (1.x)

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/1970
  
    I'll implement HBase state iterator after #1950 is merged.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1970: STORM-2383 Support HBase as state backend (1.x)

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1970#discussion_r109123032
  
    --- Diff: external/storm-hbase/src/main/java/org/apache/storm/hbase/state/HBaseKeyValueState.java ---
    @@ -0,0 +1,399 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.hbase.state;
    +
    +import org.apache.hadoop.hbase.client.Delete;
    +import org.apache.hadoop.hbase.client.Durability;
    +import org.apache.hadoop.hbase.client.Get;
    +import org.apache.hadoop.hbase.client.Mutation;
    +import org.apache.hadoop.hbase.client.Result;
    +import org.apache.storm.hbase.bolt.mapper.HBaseProjectionCriteria;
    +import org.apache.storm.hbase.common.ColumnList;
    +import org.apache.storm.hbase.common.HBaseClient;
    +import org.apache.storm.hbase.utils.HBaseEncoder;
    +import org.apache.storm.state.DefaultStateSerializer;
    +import org.apache.storm.state.KeyValueState;
    +import org.apache.storm.state.Serializer;
    +import org.apache.storm.utils.ByteArrayUtil;
    +import org.apache.storm.utils.ByteArrayWrapper;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.NavigableMap;
    +import java.util.concurrent.ConcurrentHashMap;
    +
    +/**
    + * A Hbase based implementation that persists the state in HBase.
    + */
    +public class HBaseKeyValueState<K, V> implements KeyValueState<K, V> {
    +    private static final Logger LOG = LoggerFactory.getLogger(HBaseKeyValueState.class);
    +
    +    private static final ByteArrayWrapper COMMIT_TXID_KEY = new ByteArrayWrapper("commit".getBytes());
    +    private static final ByteArrayWrapper PREPARE_TXID_KEY = new ByteArrayWrapper("prepare".getBytes());
    +
    +    private final byte[] namespace;
    +    private final byte[] prepareNamespace;
    +    private final byte[] txidNamespace;
    +    private final byte[] columnFamily;
    +    private final HBaseEncoder<K, V> encoder;
    +    private final HBaseClient hBaseClient;
    +
    +    private Map<ByteArrayWrapper, byte[]> pendingPrepare;
    +    private Map<ByteArrayWrapper, byte[]> pendingCommit;
    +
    +    // the key and value of txIds are guaranteed to be converted to UTF-8 encoded String
    +    private Map<ByteArrayWrapper, byte[]> txIds;
    +
    +    public HBaseKeyValueState(HBaseClient hbaseClient, String columnFamily, String namespace) {
    +        this(hbaseClient, columnFamily, namespace, new DefaultStateSerializer<K>(), new DefaultStateSerializer<V>());
    +    }
    +
    +    public HBaseKeyValueState(HBaseClient hBaseClient, String columnFamily, String namespace,
    +                              Serializer<K> keySerializer, Serializer<V> valueSerializer) {
    +
    +        this.hBaseClient = hBaseClient;
    +        this.columnFamily = columnFamily.getBytes();
    +        this.namespace = namespace.getBytes();
    +        this.prepareNamespace = (namespace + "$prepare").getBytes();
    +        this.txidNamespace = (namespace + "$txid").getBytes();
    +        this.encoder = new HBaseEncoder<K, V>(keySerializer, valueSerializer);
    +        this.pendingPrepare = createPendingPrepareMap();
    +        initTxids();
    +        initPendingCommit();
    +    }
    +
    +    private void initTxids() {
    +        HBaseProjectionCriteria criteria = new HBaseProjectionCriteria();
    +        criteria.addColumnFamily(columnFamily);
    +        Get get = hBaseClient.constructGetRequests(txidNamespace, criteria);
    +        try {
    +            Result[] results = hBaseClient.batchGet(Collections.singletonList(get));
    +            Result result = results[0];
    +            if (!result.isEmpty()) {
    +                NavigableMap<byte[], byte[]> familyMap = result.getFamilyMap(columnFamily);
    +                txIds = ByteArrayUtil.Maps.newHashMapWrappingKey(familyMap);
    +            } else {
    +                txIds = new HashMap<>();
    +            }
    +
    +            LOG.debug("initTxids, txIds {}", txIds);
    +        } catch (Exception e) {
    +            throw new RuntimeException(e);
    +        }
    +    }
    +
    +    private void initPendingCommit() {
    +        HBaseProjectionCriteria criteria = new HBaseProjectionCriteria();
    +        criteria.addColumnFamily(columnFamily);
    +        Get get = hBaseClient.constructGetRequests(prepareNamespace, criteria);
    +        try {
    +            Result[] results = hBaseClient.batchGet(Collections.singletonList(get));
    +            Result result = results[0];
    +            if (!result.isEmpty()) {
    +                LOG.debug("Loading previously prepared commit from {}", prepareNamespace);
    +                NavigableMap<byte[], byte[]> familyMap = result.getFamilyMap(columnFamily);
    +                pendingCommit = ByteArrayUtil.Maps.newHashMapWrappingKey(familyMap);
    +            } else {
    +                LOG.debug("No previously prepared commits.");
    +                pendingCommit = Collections.emptyMap();
    +            }
    +        } catch (Exception e) {
    +            throw new RuntimeException(e);
    +        }
    +    }
    +
    +    @Override
    +    public void put(K key, V value) {
    +        LOG.debug("put key '{}', value '{}'", key, value);
    +        byte[] columnKey = encoder.encodeKey(key);
    +        byte[] columnValue = encoder.encodeValue(value);
    +        pendingPrepare.put(new ByteArrayWrapper(columnKey), columnValue);
    +    }
    +
    +    @Override
    +    public V get(K key) {
    +        LOG.debug("get key '{}'", key);
    +        byte[] columnKey = encoder.encodeKey(key);
    +        byte[] columnValue = null;
    +
    +        ByteArrayWrapper wrappedKey = new ByteArrayWrapper(columnKey);
    +        if (pendingPrepare.containsKey(wrappedKey)) {
    +            columnValue = pendingPrepare.get(wrappedKey);
    +        } else if (pendingCommit.containsKey(wrappedKey)) {
    +            columnValue = pendingCommit.get(wrappedKey);
    +        } else {
    +            HBaseProjectionCriteria criteria = new HBaseProjectionCriteria();
    +            HBaseProjectionCriteria.ColumnMetaData column = new HBaseProjectionCriteria.ColumnMetaData(columnFamily, columnKey);
    +            criteria.addColumn(column);
    +            Get get = hBaseClient.constructGetRequests(namespace, criteria);
    +            try {
    +                Result[] results = hBaseClient.batchGet(Collections.singletonList(get));
    +                Result result = results[0];
    +                columnValue = result.getValue(column.getColumnFamily(), column.getQualifier());
    +            } catch (Exception e) {
    +                throw new RuntimeException(e);
    +            }
    +        }
    +        V value = null;
    +        if (columnValue != null) {
    +            value = encoder.decodeValue(columnValue);
    +        }
    +        LOG.debug("Value for key '{}' is '{}'", key, value);
    +        return value;
    +    }
    +
    +    @Override
    +    public V get(K key, V defaultValue) {
    +        V val = get(key);
    +        return val != null ? val : defaultValue;
    +    }
    +
    +    @Override
    +    public V delete(K key) {
    +        LOG.debug("delete key '{}'", key);
    +        byte[] columnKey = encoder.encodeKey(key);
    +        V curr = get(key);
    +        pendingPrepare.put(new ByteArrayWrapper(columnKey), HBaseEncoder.TOMBSTONE);
    +        return curr;
    +    }
    +
    +    @Override
    +    public Iterator<Map.Entry<K, V>> iterator() {
    +        // TODO: is it possible to implement this correctly?
    --- End diff --
    
    @HeartSaVioR nice observation. Is it because the entries in the pending iterators can get committed to the db (redis) while we are iterating ? If so we may be we need to fix it. While returning the values from the pending iterators we could store the keys into a set and use this to filter already returned keys while scanning the db (redis).
    
    We put 'tombstone' entries into the pending maps during delete, but I don't see this handled during the iteration. May be we need to handle this as well.
    
    Once the redis implementation is fixed, may be you can refactor and re-use the common parts for hbase as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1970: STORM-2383 Support HBase as state backend (1.x)

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/1970
  
    @arunmahadevan 
    OK I can change the implementation to store KV to each row. Will update the patch and let you know.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1970: STORM-2383 Support HBase as state backend (1.x)

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/1970
  
    I'll also craft PR against master branch when implementing HBase state iterator.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1970: STORM-2383 Support HBase as state backend (1.x)

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1970#discussion_r108137761
  
    --- Diff: docs/State-checkpointing.md ---
    @@ -161,3 +164,60 @@ The framework instantiates the state via the corresponding `StateProvider` imple
     a `StateProvider` implementation which can load and return the state based on the namespace. Each state belongs to a unique namespace.
     The namespace is typically unique per task so that each task can have its own state. The StateProvider and the corresponding
     State implementation should be available in the class path of Storm (by placing them in the extlib directory).
    +
    +
    +### Supported State Backends
    +
    +#### Redis
    +
    +* State provider class name (`topology.state.provider`)
    +
    +`org.apache.storm.redis.state.RedisKeyValueStateProvider`
    +
    +* Provider config (`topology.state.provider.config`)
    +
    +```
    + {
    +   "keyClass": "Optional fully qualified class name of the Key type.",
    +   "valueClass": "Optional fully qualified class name of the Value type.",
    +   "keySerializerClass": "Optional Key serializer implementation class.",
    +   "valueSerializerClass": "Optional Value Serializer implementation class.",
    +   "jedisPoolConfig": {
    +     "host": "localhost",
    +     "port": 6379,
    +     "timeout": 2000,
    +     "database": 0,
    +     "password": "xyz"
    +   }
    + }
    + ```
    + 
    +* Artifacts to add (`--artifacts`)
    +
    +`org.apache.storm:storm-redis:<storm-version>`
    +
    +#### HBase
    +
    +NOTE: HBase state provider uses pre-created table and column family, so users need to create and provide one to the provider config. 
    --- End diff --
    
    may be show an example,
    
    `create 'state', 'cf'`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1970: STORM-2383 Support HBase as state backend (1.x)

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1970#discussion_r108138106
  
    --- Diff: docs/State-checkpointing.md ---
    @@ -161,3 +164,60 @@ The framework instantiates the state via the corresponding `StateProvider` imple
     a `StateProvider` implementation which can load and return the state based on the namespace. Each state belongs to a unique namespace.
     The namespace is typically unique per task so that each task can have its own state. The StateProvider and the corresponding
     State implementation should be available in the class path of Storm (by placing them in the extlib directory).
    +
    +
    +### Supported State Backends
    +
    +#### Redis
    +
    +* State provider class name (`topology.state.provider`)
    +
    +`org.apache.storm.redis.state.RedisKeyValueStateProvider`
    +
    +* Provider config (`topology.state.provider.config`)
    +
    +```
    + {
    +   "keyClass": "Optional fully qualified class name of the Key type.",
    +   "valueClass": "Optional fully qualified class name of the Value type.",
    +   "keySerializerClass": "Optional Key serializer implementation class.",
    +   "valueSerializerClass": "Optional Value Serializer implementation class.",
    +   "jedisPoolConfig": {
    +     "host": "localhost",
    +     "port": 6379,
    +     "timeout": 2000,
    +     "database": 0,
    +     "password": "xyz"
    +   }
    + }
    + ```
    + 
    +* Artifacts to add (`--artifacts`)
    +
    +`org.apache.storm:storm-redis:<storm-version>`
    +
    +#### HBase
    +
    +NOTE: HBase state provider uses pre-created table and column family, so users need to create and provide one to the provider config. 
    +
    +* State provider class name (`topology.state.provider`)
    +
    +`org.apache.storm.hbase.state.HBaseKeyValueStateProvider`
    +
    +* Provider config (`topology.state.provider.config`)
    +        
    +```
    + {
    +   "keyClass": "Optional fully qualified class name of the Key type.",
    +   "valueClass": "Optional fully qualified class name of the Value type.",
    +   "keySerializerClass": "Optional Key serializer implementation class.",
    +   "valueSerializerClass": "Optional Value Serializer implementation class.",
    +   "hbaseConfigKey": "config key to load hbase configuration from storm root configuration. (similar to storm-hbase)",
    +   "tableName": "Pre-created table name for state.",
    +   "columnFamily": "Pre-created column family for state."
    + }
    + ```
    --- End diff --
    
    may be show an example,
    
    ```java
            Config conf = new Config();
            Map<String, Object> hbConf = new HashMap<String, Object>();
            hbConf.put("hbase.rootdir", "file:///tmp/hbase");
            conf.put("hbase.conf", hbConf);
            conf.put("topology.state.provider",  "org.apache.storm.hbase.state.HBaseKeyValueStateProvider");
            conf.put("topology.state.provider.config", "{" +
                    "   \"hbaseConfigKey\": \"hbase.conf\"," +
                    "   \"tableName\": \"state\"," +
                    "   \"columnFamily\": \"cf\"" +
                    " }");
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1970: STORM-2383 Support HBase as state backend (1.x)

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1970#discussion_r125612223
  
    --- Diff: external/storm-hbase/src/main/java/org/apache/storm/hbase/state/HBaseKeyValueStateIterator.java ---
    @@ -0,0 +1,153 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.hbase.state;
    +
    +import java.util.Arrays;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.TreeMap;
    +
    +import com.google.common.primitives.UnsignedBytes;
    +import org.apache.hadoop.hbase.client.Result;
    +import org.apache.hadoop.hbase.client.ResultScanner;
    +import org.apache.storm.hbase.common.HBaseClient;
    +import org.apache.storm.state.BaseBinaryStateIterator;
    +import org.apache.storm.state.DefaultStateEncoder;
    +import org.apache.storm.state.Serializer;
    +
    +import org.apache.storm.state.StateEncoder;
    +
    +import static org.apache.storm.hbase.state.HBaseKeyValueState.STATE_QUALIFIER;
    +
    +/**
    + * An iterator over {@link HBaseKeyValueState}.
    + */
    +public class HBaseKeyValueStateIterator<K, V> extends BaseBinaryStateIterator<K, V> {
    --- End diff --
    
    can't this be used for the `state.iterator()` ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1970: STORM-2383 Support HBase as state backend (1.x)

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1970#discussion_r125611698
  
    --- Diff: external/storm-hbase/src/main/java/org/apache/storm/hbase/state/HBaseKeyValueState.java ---
    @@ -0,0 +1,419 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.hbase.state;
    +
    +import com.google.common.collect.Maps;
    +import com.google.common.primitives.UnsignedBytes;
    +import org.apache.hadoop.hbase.client.Delete;
    +import org.apache.hadoop.hbase.client.Durability;
    +import org.apache.hadoop.hbase.client.Get;
    +import org.apache.hadoop.hbase.client.Mutation;
    +import org.apache.hadoop.hbase.client.Result;
    +import org.apache.storm.hbase.bolt.mapper.HBaseProjectionCriteria;
    +import org.apache.storm.hbase.common.ColumnList;
    +import org.apache.storm.hbase.common.HBaseClient;
    +import org.apache.storm.state.DefaultStateEncoder;
    +import org.apache.storm.state.DefaultStateSerializer;
    +import org.apache.storm.state.KeyValueState;
    +import org.apache.storm.state.Serializer;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.NavigableMap;
    +import java.util.TreeMap;
    +import java.util.concurrent.ConcurrentNavigableMap;
    +import java.util.concurrent.ConcurrentSkipListMap;
    +
    +/**
    + * A Hbase based implementation that persists the state in HBase.
    + */
    +public class HBaseKeyValueState<K, V> implements KeyValueState<K, V> {
    +    private static final Logger LOG = LoggerFactory.getLogger(HBaseKeyValueState.class);
    +
    +    public static byte[] STATE_QUALIFIER = "s".getBytes();
    +
    +    public static final NavigableMap<byte[], byte[]> EMPTY_PENDING_COMMIT_MAP = Maps.unmodifiableNavigableMap(
    +            new TreeMap<byte[], byte[]>(UnsignedBytes.lexicographicalComparator()));
    +
    +    private static byte[] COMMIT_TXID_KEY = "commit".getBytes();
    +    private static byte[] PREPARE_TXID_KEY = "prepare".getBytes();
    +
    +    private final byte[] keyNamespace;
    +    private final byte[] prepareNamespace;
    +    private final byte[] txidNamespace;
    +    private final String namespace;
    +    private final byte[] columnFamily;
    +    private final DefaultStateEncoder<K, V> encoder;
    +    private final HBaseClient hBaseClient;
    +
    +    private ConcurrentNavigableMap<byte[], byte[]> pendingPrepare;
    +    private NavigableMap<byte[], byte[]> pendingCommit;
    +
    +    // the key and value of txIds are guaranteed to be converted to UTF-8 encoded String
    +    private NavigableMap<byte[], byte[]> txIds;
    +
    +    public HBaseKeyValueState(HBaseClient hbaseClient, String columnFamily, String namespace) {
    +        this(hbaseClient, columnFamily, namespace, new DefaultStateSerializer<K>(),
    +                new DefaultStateSerializer<V>());
    +    }
    +
    +    public HBaseKeyValueState(HBaseClient hBaseClient, String columnFamily, String namespace,
    +                              Serializer<K> keySerializer, Serializer<V> valueSerializer) {
    +
    +        this.hBaseClient = hBaseClient;
    +        this.columnFamily = columnFamily.getBytes();
    +        this.namespace = namespace;
    +        this.keyNamespace = (namespace + "$key:").getBytes();
    +        this.prepareNamespace = (namespace + "$prepare").getBytes();
    +        this.txidNamespace = (namespace + "$txid").getBytes();
    +        this.encoder = new DefaultStateEncoder<K, V>(keySerializer, valueSerializer);
    +        this.pendingPrepare = createPendingPrepareMap();
    +        initTxids();
    +        initPendingCommit();
    +    }
    +
    +    private void initTxids() {
    +        HBaseProjectionCriteria criteria = new HBaseProjectionCriteria();
    +        criteria.addColumnFamily(columnFamily);
    +        Get get = hBaseClient.constructGetRequests(txidNamespace, criteria);
    +        try {
    +            Result[] results = hBaseClient.batchGet(Collections.singletonList(get));
    +            Result result = results[0];
    +            if (!result.isEmpty()) {
    +                NavigableMap<byte[], byte[]> familyMap = result.getFamilyMap(columnFamily);
    +                txIds = new TreeMap<>(familyMap);
    +            } else {
    +                txIds = new TreeMap<>(UnsignedBytes.lexicographicalComparator());
    +            }
    +
    +            LOG.debug("initTxids, txIds {}", txIds);
    +        } catch (Exception e) {
    +            throw new RuntimeException(e);
    +        }
    +    }
    +
    +    private void initPendingCommit() {
    +        HBaseProjectionCriteria criteria = new HBaseProjectionCriteria();
    +        criteria.addColumnFamily(columnFamily);
    +        Get get = hBaseClient.constructGetRequests(prepareNamespace, criteria);
    +        try {
    +            Result[] results = hBaseClient.batchGet(Collections.singletonList(get));
    +            Result result = results[0];
    +            if (!result.isEmpty()) {
    +                LOG.debug("Loading previously prepared commit from {}", prepareNamespace);
    +                NavigableMap<byte[], byte[]> familyMap = result.getFamilyMap(columnFamily);
    +                pendingCommit = Maps.unmodifiableNavigableMap(familyMap);
    +            } else {
    +                LOG.debug("No previously prepared commits.");
    +                pendingCommit = EMPTY_PENDING_COMMIT_MAP;
    +            }
    +        } catch (Exception e) {
    +            throw new RuntimeException(e);
    +        }
    +    }
    +
    +    @Override
    +    public void put(K key, V value) {
    +        LOG.debug("put key '{}', value '{}'", key, value);
    +        byte[] columnKey = encoder.encodeKey(key);
    +        byte[] columnValue = encoder.encodeValue(value);
    +        pendingPrepare.put(columnKey, columnValue);
    +    }
    +
    +    @Override
    +    public V get(K key) {
    +        LOG.debug("get key '{}'", key);
    +        byte[] columnKey = encoder.encodeKey(key);
    +        byte[] columnValue = null;
    +
    +        if (pendingPrepare.containsKey(columnKey)) {
    +            columnValue = pendingPrepare.get(columnKey);
    +        } else if (pendingCommit.containsKey(columnKey)) {
    +            columnValue = pendingCommit.get(columnKey);
    +        } else {
    +            HBaseProjectionCriteria criteria = new HBaseProjectionCriteria();
    +            HBaseProjectionCriteria.ColumnMetaData column = new HBaseProjectionCriteria.ColumnMetaData(columnFamily,
    +                    STATE_QUALIFIER);
    +            criteria.addColumn(column);
    +            Get get = hBaseClient.constructGetRequests(getRowKeyForStateKey(columnKey), criteria);
    +            try {
    +                Result[] results = hBaseClient.batchGet(Collections.singletonList(get));
    +                Result result = results[0];
    +                columnValue = result.getValue(column.getColumnFamily(), column.getQualifier());
    +            } catch (Exception e) {
    +                throw new RuntimeException(e);
    +            }
    +        }
    +        V value = null;
    +        if (columnValue != null) {
    +            value = encoder.decodeValue(columnValue);
    +        }
    +        LOG.debug("Value for key '{}' is '{}'", key, value);
    +        return value;
    +    }
    +
    +    @Override
    +    public V get(K key, V defaultValue) {
    +        V val = get(key);
    +        return val != null ? val : defaultValue;
    +    }
    +
    +    @Override
    +    public V delete(K key) {
    +        LOG.debug("delete key '{}'", key);
    +        byte[] columnKey = encoder.encodeKey(key);
    +        V curr = get(key);
    +        pendingPrepare.put(columnKey, encoder.getTombstoneValue());
    +        return curr;
    +    }
    +
    +    @Override
    +    public Iterator<Map.Entry<K, V>> iterator() {
    +        throw new UnsupportedOperationException("Not supported yet.");
    --- End diff --
    
    Can you raise a follow up JIRA for this ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1970: STORM-2383 Support HBase as state backend (1.x)

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/1970
  
    The compilation failure is from STORM-2388. I'll rebase after STORM-2388 is merged.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---