You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by HeartSaVioR <gi...@git.apache.org> on 2017/02/21 14:11:42 UTC

[GitHub] storm pull request #1950: STORM-2369 [storm-redis] Use binary type for State...

GitHub user HeartSaVioR opened a pull request:

    https://github.com/apache/storm/pull/1950

    STORM-2369 [storm-redis] Use binary type for State management (1.x)

    * introduce new command interface and relevant containers
      * new command interface will contain both binary and string commands in near future
    * change RedisKeyValue to use byte[] for State key and value management instead of String
      * get rid of Base64 encode/decode on RedisEncoder, and also SafeEncoder.encode() in Jedis internal
    * implement some utils for handling Map with byte[]: since byte[] is bad for key type of Map
    
    NOTE: Currently I can't run or debug tests on IntelliJ for master branch (so worked on 1.x-branch). I also heard many folks are struggling, so might be better to address it soon.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/HeartSaVioR/storm STORM-2369-1.x

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/1950.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1950
    
----
commit 1c73ceeca7d78fde44cead5d7b9b5b8d0ad9e7e4
Author: Jungtaek Lim <ka...@gmail.com>
Date:   2017-02-21T06:35:58Z

    STORM-2369 [storm-redis] Use binary type for State management
    
    * introduce new command interface and relevant containers
      * new command interface will contain both binary and string commands in near future
    * change RedisKeyValue to use byte[] for State key and value management instead of String
      * get rid of Base64 encode/decode on RedisEncoder, and also SafeEncoder.encode() in Jedis internal
    * implement some utils for handling Map with byte[]: since byte[] is bad for key type of Map

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1950: STORM-2369 [storm-redis] Use binary type for State...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1950#discussion_r109658382
  
    --- Diff: external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueState.java ---
    @@ -152,70 +170,72 @@ public V get(K key, V defaultValue) {
         @Override
         public V delete(K key) {
             LOG.debug("delete key '{}'", key);
    -        String redisKey = encoder.encodeKey(key);
    +        byte[] redisKey = encoder.encodeKey(key);
             V curr = get(key);
    -        pendingPrepare.put(redisKey, RedisEncoder.TOMBSTONE);
    +        pendingPrepare.put(new ByteArrayWrapper(redisKey), RedisEncoder.TOMBSTONE);
             return curr;
         }
     
         @Override
         public Iterator<Map.Entry<K, V>> iterator() {
    -        return new RedisKeyValueStateIterator<K, V>(namespace, jedisContainer, pendingPrepare.entrySet().iterator(), pendingCommit.entrySet().iterator(),
    +        return new RedisKeyValueStateIterator<K, V>(namespace, container, pendingPrepare.entrySet().iterator(), pendingCommit.entrySet().iterator(),
                     ITERATOR_CHUNK_SIZE, encoder.getKeySerializer(), encoder.getValueSerializer());
         }
     
         @Override
         public void prepareCommit(long txid) {
             LOG.debug("prepareCommit txid {}", txid);
             validatePrepareTxid(txid);
    -        JedisCommands commands = null;
    +        RedisCommands commands = null;
             try {
    -            Map<String, String> currentPending = pendingPrepare;
    -            pendingPrepare = new ConcurrentHashMap<>();
    -            commands = jedisContainer.getInstance();
    +            Map<ByteArrayWrapper, byte[]> currentPending = pendingPrepare;
    +            pendingPrepare = createPendingPrepareMap();
    +            commands = container.getInstance();
                 if (commands.exists(prepareNamespace)) {
                     LOG.debug("Prepared txn already exists, will merge", txid);
    -                for (Map.Entry<String, String> e: pendingCommit.entrySet()) {
    +                for (Map.Entry<ByteArrayWrapper, byte[]> e: pendingCommit.entrySet()) {
                         if (!currentPending.containsKey(e.getKey())) {
                             currentPending.put(e.getKey(), e.getValue());
                         }
                     }
                 }
                 if (!currentPending.isEmpty()) {
    -                commands.hmset(prepareNamespace, currentPending);
    +                commands.hmset(prepareNamespace, ByteArrayUtil.Maps.newHashMapUnwrappingKey(currentPending));
    --- End diff --
    
    maybe you can avoid the wrapping/unwrapping and extra copying if you use a TreeMap with a comparator that compares the byte array. (e.g. `ConcurrentSkipListMap<>(com.google.common.primitives.UnsignedBytes.lexicographicalComparator())` since we want concurrency).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1950: STORM-2369 [storm-redis] Use binary type for State...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1950#discussion_r109814663
  
    --- Diff: external/storm-redis/src/main/java/org/apache/storm/redis/utils/ByteArrayUtil.java ---
    @@ -0,0 +1,61 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.redis.utils;
    +
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.concurrent.ConcurrentHashMap;
    +
    +public class ByteArrayUtil {
    +    private ByteArrayUtil() {
    +    }
    +
    +    public static class Maps {
    +        public static Map<ByteArrayWrapper, byte[]> newHashMapWrappingKey(Map<byte[], byte[]> source) {
    +            Map<ByteArrayWrapper, byte[]> destination = new HashMap<>(source.size());
    +            wrapEntities(source, destination);
    +            return destination;
    +        }
    +
    +        public static Map<ByteArrayWrapper, byte[]> newConcurrentHashMapWrappingKey(Map<byte[], byte[]> source) {
    --- End diff --
    
    Yes I'll remove that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1950: STORM-2369 [storm-redis] Use binary type for State...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1950#discussion_r124188548
  
    --- Diff: storm-core/src/jvm/org/apache/storm/state/BaseBinaryStateIterator.java ---
    @@ -0,0 +1,162 @@
    +package org.apache.storm.state;
    +
    +import com.google.common.collect.Iterators;
    +import com.google.common.collect.PeekingIterator;
    +import com.google.common.primitives.UnsignedBytes;
    +
    +import java.util.AbstractMap;
    +import java.util.Arrays;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.NoSuchElementException;
    +import java.util.Set;
    +import java.util.TreeSet;
    +
    +/**
    + * Base implementation of iterator over {@link KeyValueState} which is based on binary type.
    + */
    +public abstract class BaseBinaryStateIterator<K, V> implements Iterator<Map.Entry<K, V>> {
    +
    +  private final PeekingIterator<Map.Entry<byte[], byte[]>> pendingPrepareIterator;
    +  private final PeekingIterator<Map.Entry<byte[], byte[]>> pendingCommitIterator;
    +  private final Set<byte[]> providedKeys;
    +
    +  private boolean firstLoad = true;
    +  private PeekingIterator<Map.Entry<byte[], byte[]>> pendingIterator;
    +  private PeekingIterator<Map.Entry<byte[], byte[]>> cachedResultIterator;
    +
    +  /**
    +   * Constructor.
    +   *
    +   * @param pendingPrepareIterator The iterator of pendingPrepare
    +   * @param pendingCommitIterator The iterator of pendingCommit
    +   */
    +  public BaseBinaryStateIterator(Iterator<Map.Entry<byte[], byte[]>> pendingPrepareIterator,
    +      Iterator<Map.Entry<byte[], byte[]>> pendingCommitIterator) {
    +    this.pendingPrepareIterator = Iterators.peekingIterator(pendingPrepareIterator);
    +    this.pendingCommitIterator = Iterators.peekingIterator(pendingCommitIterator);
    +    this.providedKeys = new TreeSet<>(UnsignedBytes.lexicographicalComparator());
    +  }
    +
    +  @Override
    +  public boolean hasNext() {
    +    if (seekToAvailableEntry(pendingPrepareIterator)) {
    +      pendingIterator = pendingPrepareIterator;
    +      return true;
    +    }
    +
    +    if (seekToAvailableEntry(pendingCommitIterator)) {
    +      pendingIterator = pendingCommitIterator;
    +      return true;
    +    }
    +
    +
    +    if (firstLoad) {
    +      // load the first part of entries
    +      fillCachedResultIterator();
    +      firstLoad = false;
    +    }
    +
    +    while (true) {
    +      if (seekToAvailableEntry(cachedResultIterator)) {
    +        pendingIterator = cachedResultIterator;
    +        return true;
    +      }
    +
    +      if (isEndOfDataFromStorage()) {
    +        break;
    +      }
    +
    +      fillCachedResultIterator();
    +    }
    +
    +    pendingIterator = null;
    +    return false;
    +  }
    +
    +  private void fillCachedResultIterator() {
    +    Iterator<Map.Entry<byte[], byte[]>> iterator = loadChunkFromStateStorage();
    +    if (iterator != null) {
    +      cachedResultIterator = Iterators.peekingIterator(iterator);
    +    } else {
    +      cachedResultIterator = null;
    +    }
    +  }
    +
    +  @Override
    +  public Map.Entry<K, V> next() {
    +    if (!hasNext()) {
    +      throw new NoSuchElementException();
    +    }
    +
    +    Map.Entry<byte[], byte[]> keyValue = pendingIterator.next();
    +
    +    K key = decodeKey(keyValue.getKey());
    +    V value = decodeValue(keyValue.getValue());
    +
    +    providedKeys.add(keyValue.getKey());
    +    return new AbstractMap.SimpleEntry(key, value);
    +  }
    +
    +  @Override
    +  public void remove() {
    +    throw new UnsupportedOperationException();
    +  }
    +
    +  /**
    +   * Load some part of state KVs from storage and returns iterator of cached data from storage.
    +   *
    +   * @return Iterator of loaded state KVs
    +   */
    +  protected abstract Iterator<Map.Entry<byte[],byte[]>> loadChunkFromStateStorage();
    +
    +  /**
    +   * Check whether end of data is reached from storage state KVs.
    +   *
    +   * @return whether end of data is reached from storage state KVs
    +   */
    +  protected abstract boolean isEndOfDataFromStorage();
    +
    +  /**
    +   * Decode key to convert byte array to state key type.
    +   *
    +   * @param key byte array encoded key
    +   * @return Decoded value of key
    +   */
    +  protected abstract K decodeKey(byte[] key);
    --- End diff --
    
    Same above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1950: STORM-2369 [storm-redis] Use binary type for State...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1950#discussion_r124186673
  
    --- Diff: external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueState.java ---
    @@ -64,75 +75,83 @@ public RedisKeyValueState(String namespace, JedisPoolConfig poolConfig) {
         }
     
         public RedisKeyValueState(String namespace, JedisPoolConfig poolConfig, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
    -        this(namespace, JedisCommandsContainerBuilder.build(poolConfig), keySerializer, valueSerializer);
    +        this(namespace, RedisCommandsContainerBuilder.build(poolConfig), keySerializer, valueSerializer);
         }
     
    -    public RedisKeyValueState(String namespace, JedisCommandsInstanceContainer jedisContainer,
    +    public RedisKeyValueState(String namespace, JedisClusterConfig jedisClusterConfig, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
    +        this(namespace, RedisCommandsContainerBuilder.build(jedisClusterConfig), keySerializer, valueSerializer);
    +    }
    +
    +    public RedisKeyValueState(String namespace, RedisCommandsInstanceContainer container,
                                   Serializer<K> keySerializer, Serializer<V> valueSerializer) {
    -        this.namespace = namespace;
    -        this.prepareNamespace = namespace + "$prepare";
    +        this.namespace = SafeEncoder.encode(namespace);
    +        this.prepareNamespace = SafeEncoder.encode(namespace + "$prepare");
             this.txidNamespace = namespace + "$txid";
    -        this.encoder = new RedisEncoder<K, V>(keySerializer, valueSerializer);
    -        this.jedisContainer = jedisContainer;
    -        this.pendingPrepare = new ConcurrentHashMap<>();
    +        this.encoder = new DefaultStateEncoder<K, V>(keySerializer, valueSerializer);
    +        this.container = container;
    +        this.pendingPrepare = createPendingPrepareMap();
             initTxids();
             initPendingCommit();
         }
     
         private void initTxids() {
    -        JedisCommands commands = null;
    +        RedisCommands commands = null;
             try {
    -            commands = jedisContainer.getInstance();
    +            commands = container.getInstance();
                 if (commands.exists(txidNamespace)) {
                     txIds = commands.hgetAll(txidNamespace);
                 } else {
                     txIds = new HashMap<>();
                 }
                 LOG.debug("initTxids, txIds {}", txIds);
             } finally {
    -            jedisContainer.returnInstance(commands);
    +            container.returnInstance(commands);
             }
         }
     
         private void initPendingCommit() {
    -        JedisCommands commands = null;
    +        RedisCommands commands = null;
             try {
    -            commands = jedisContainer.getInstance();
    +            commands = container.getInstance();
                 if (commands.exists(prepareNamespace)) {
                     LOG.debug("Loading previously prepared commit from {}", prepareNamespace);
    -                pendingCommit = Collections.unmodifiableMap(commands.hgetAll(prepareNamespace));
    +                NavigableMap<byte[], byte[]> pendingCommitMap = createPendingCommitMap();
    +                pendingCommitMap.putAll(commands.hgetAll(prepareNamespace));
    +                pendingCommit = Maps.unmodifiableNavigableMap(pendingCommitMap);
                 } else {
                     LOG.debug("No previously prepared commits.");
    -                pendingCommit = Collections.emptyMap();
    +                pendingCommit = createPendingCommitMap();
    --- End diff --
    
    Good point. Will address. Btw I'll try to avoid JDK 8 thing right now unless it is only applied to the master branch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1950: STORM-2369 [storm-redis] Use binary type for State...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1950#discussion_r124185192
  
    --- Diff: docs/State-checkpointing.md ---
    @@ -70,6 +70,46 @@ json config with the following properties.
         }
     }
     ```
    + 
    +For Redis Cluster state this is a json config with the following properties.
    + 
    +```
    + {
    +   "keyClass": "Optional fully qualified class name of the Key type.",
    +   "valueClass": "Optional fully qualified class name of the Value type.",
    +   "keySerializerClass": "Optional Key serializer implementation class.",
    +   "valueSerializerClass": "Optional Value Serializer implementation class.",
    +   "jedisClusterConfig": {
    +     "nodes": ["localhost:7379", "localhost:7380", "localhost:7381"],
    +     "timeout": 2000,
    +     "maxRedirections": 5
    +   }
    + }
    +```
    +
    +NOTE: If you used Redis state with Storm version 1.1.0 or earlier, you would need to also migrate your state since the representation of state has changed  
    +from Base64-encoded string to binary to reduce huge overhead. Storm provides a migration tool to help, which is placed on `storm-redis-example` module.
    +
    +Please download the source from download page or clone the project, and type below command:
    +
    +```
    +mvn clean install -DskipTests
    +cd examples/storm-redis-examples
    +<storm-installation-dir>/bin/storm jar target/storm-redis-examples-*.jar org.apache.storm.redis.tools.Base64ToBinaryStateMigrationUtil [options]
    +```
    +
    +Supported options are listed here:
    +
    +```
    + -d,--dbnum <arg>       Redis DB number (default: 0)
    + -h,--host <arg>        Redis hostname (default: localhost)
    + -n,--namespace <arg>   REQUIRED the list of namespace to migrate.
    + -p,--port <arg>        Redis port (default: 6379)
    +    --password <arg>    Redis password (default: no password)
    +```
    +
    +You can provide multiple `namespace` options to migrate multiple namespaces at once. Other options are not mandatory.
    --- End diff --
    
    Yes that would be nice to add it. Will add.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1950: STORM-2369 [storm-redis] Use binary type for State manage...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/1950
  
    @arunmahadevan 
    Also updated the doc to explain how to use migration tool. Commits squashed. 
    Please review again. Thanks in advance.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1950: STORM-2369 [storm-redis] Use binary type for State...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1950#discussion_r124184421
  
    --- Diff: docs/State-checkpointing.md ---
    @@ -70,6 +70,46 @@ json config with the following properties.
         }
     }
     ```
    + 
    +For Redis Cluster state this is a json config with the following properties.
    + 
    +```
    + {
    +   "keyClass": "Optional fully qualified class name of the Key type.",
    +   "valueClass": "Optional fully qualified class name of the Value type.",
    +   "keySerializerClass": "Optional Key serializer implementation class.",
    +   "valueSerializerClass": "Optional Value Serializer implementation class.",
    +   "jedisClusterConfig": {
    +     "nodes": ["localhost:7379", "localhost:7380", "localhost:7381"],
    +     "timeout": 2000,
    +     "maxRedirections": 5
    +   }
    + }
    +```
    +
    +NOTE: If you used Redis state with Storm version 1.1.0 or earlier, you would need to also migrate your state since the representation of state has changed  
    --- End diff --
    
    I think it will throw exception because state will not be converted properly, but there's no code to determine State version for now. Adding state version to the checkpoint spout state might be the one way to do this. I'll look into more about how we can handle this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1950: STORM-2369 [storm-redis] Use binary type for State...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1950#discussion_r123958025
  
    --- Diff: docs/State-checkpointing.md ---
    @@ -70,6 +70,46 @@ json config with the following properties.
         }
     }
     ```
    + 
    +For Redis Cluster state this is a json config with the following properties.
    + 
    +```
    + {
    +   "keyClass": "Optional fully qualified class name of the Key type.",
    +   "valueClass": "Optional fully qualified class name of the Value type.",
    +   "keySerializerClass": "Optional Key serializer implementation class.",
    +   "valueSerializerClass": "Optional Value Serializer implementation class.",
    +   "jedisClusterConfig": {
    +     "nodes": ["localhost:7379", "localhost:7380", "localhost:7381"],
    +     "timeout": 2000,
    +     "maxRedirections": 5
    +   }
    + }
    +```
    +
    +NOTE: If you used Redis state with Storm version 1.1.0 or earlier, you would need to also migrate your state since the representation of state has changed  
    +from Base64-encoded string to binary to reduce huge overhead. Storm provides a migration tool to help, which is placed on `storm-redis-example` module.
    +
    +Please download the source from download page or clone the project, and type below command:
    +
    +```
    +mvn clean install -DskipTests
    +cd examples/storm-redis-examples
    +<storm-installation-dir>/bin/storm jar target/storm-redis-examples-*.jar org.apache.storm.redis.tools.Base64ToBinaryStateMigrationUtil [options]
    +```
    +
    +Supported options are listed here:
    +
    +```
    + -d,--dbnum <arg>       Redis DB number (default: 0)
    + -h,--host <arg>        Redis hostname (default: localhost)
    + -n,--namespace <arg>   REQUIRED the list of namespace to migrate.
    + -p,--port <arg>        Redis port (default: 6379)
    +    --password <arg>    Redis password (default: no password)
    +```
    +
    +You can provide multiple `namespace` options to migrate multiple namespaces at once. Other options are not mandatory.
    --- End diff --
    
    May be better to show an example of the command with a couple of namespaces.
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1950: STORM-2369 [storm-redis] Use binary type for State...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1950#discussion_r123979462
  
    --- Diff: external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueState.java ---
    @@ -266,10 +288,10 @@ public void rollback() {
                     LOG.debug("hmset txidNamespace {}, txIds {}", txidNamespace, txIds);
                     commands.hmset(txidNamespace, txIds);
                 }
    -            pendingCommit = Collections.emptyMap();
    -            pendingPrepare = new ConcurrentHashMap<>();
    +            pendingCommit = createPendingCommitMap();
    --- End diff --
    
    nit: can be `Collections.emptyNavigableMap()` (java8) or a static final variable initialized with an empty TreeMap


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1950: STORM-2369 [storm-redis] Use binary type for State...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1950#discussion_r123979324
  
    --- Diff: external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueState.java ---
    @@ -187,70 +206,73 @@ public void prepareCommit(long txid) {
                     LOG.debug("Nothing to save for prepareCommit, txid {}.", txid);
                 }
                 txIds.put(PREPARE_TXID_KEY, String.valueOf(txid));
    +
                 commands.hmset(txidNamespace, txIds);
    -            pendingCommit = Collections.unmodifiableMap(currentPending);
    +            pendingCommit = Maps.unmodifiableNavigableMap(currentPending);
             } finally {
    -            jedisContainer.returnInstance(commands);
    +            container.returnInstance(commands);
             }
         }
     
         @Override
         public void commit(long txid) {
             LOG.debug("commit txid {}", txid);
             validateCommitTxid(txid);
    -        JedisCommands commands = null;
    +        RedisCommands commands = null;
             try {
    -            commands = jedisContainer.getInstance();
    +            commands = container.getInstance();
                 if (!pendingCommit.isEmpty()) {
    -                List<String> keysToDelete = new ArrayList<>();
    -                Map<String, String> keysToAdd = new HashMap<>();
    -                for(Map.Entry<String, String> entry: pendingCommit.entrySet()) {
    -                    if (RedisEncoder.TOMBSTONE.equals(entry.getValue())) {
    -                        keysToDelete.add(entry.getKey());
    +                List<byte[]> keysToDelete = new ArrayList<>();
    +                Map<byte[], byte[]> keysToAdd = new HashMap<>();
    +                for(Map.Entry<byte[], byte[]> entry: pendingCommit.entrySet()) {
    +                    byte[] key = entry.getKey();
    +                    byte[] value = entry.getValue();
    +                    if (Arrays.equals(DefaultStateEncoder.TOMBSTONE, value)) {
    +                        keysToDelete.add(key);
                         } else {
    -                        keysToAdd.put(entry.getKey(), entry.getValue());
    +                        keysToAdd.put(key, value);
                         }
                     }
                     if (!keysToAdd.isEmpty()) {
                         commands.hmset(namespace, keysToAdd);
                     }
                     if (!keysToDelete.isEmpty()) {
    -                    commands.hdel(namespace, keysToDelete.toArray(new String[0]));
    +                    commands.hdel(namespace, keysToDelete.toArray(new byte[0][]));
                     }
                 } else {
                     LOG.debug("Nothing to save for commit, txid {}.", txid);
                 }
                 txIds.put(COMMIT_TXID_KEY, String.valueOf(txid));
                 commands.hmset(txidNamespace, txIds);
                 commands.del(prepareNamespace);
    -            pendingCommit = Collections.emptyMap();
    +            pendingCommit = createPendingCommitMap();
    --- End diff --
    
    nit: can be `Collections.emptyNavigableMap()` (java8) or a static final variable initialized with an empty TreeMap


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1950: STORM-2369 [storm-redis] Use binary type for State...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/storm/pull/1950


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1950: STORM-2369 [storm-redis] Use binary type for State...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1950#discussion_r109635740
  
    --- Diff: external/storm-redis/src/main/java/org/apache/storm/redis/utils/ByteArrayUtil.java ---
    @@ -0,0 +1,61 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.redis.utils;
    +
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.concurrent.ConcurrentHashMap;
    +
    +public class ByteArrayUtil {
    +    private ByteArrayUtil() {
    +    }
    +
    +    public static class Maps {
    +        public static Map<ByteArrayWrapper, byte[]> newHashMapWrappingKey(Map<byte[], byte[]> source) {
    +            Map<ByteArrayWrapper, byte[]> destination = new HashMap<>(source.size());
    +            wrapEntities(source, destination);
    +            return destination;
    +        }
    +
    +        public static Map<ByteArrayWrapper, byte[]> newConcurrentHashMapWrappingKey(Map<byte[], byte[]> source) {
    --- End diff --
    
    not used?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1950: STORM-2369 [storm-redis] Use binary type for State...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1950#discussion_r109653583
  
    --- Diff: external/storm-redis/src/main/java/org/apache/storm/redis/common/commands/RedisCommands.java ---
    @@ -0,0 +1,56 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.redis.common.commands;
    +
    +import redis.clients.jedis.ScanParams;
    +import redis.clients.jedis.ScanResult;
    +
    +import java.util.Map;
    +
    +/**
    + * This interface represents Jedis methods exhaustively which are used on storm-redis.
    + *
    + * This is a workaround since Jedis and JedisCluster doesn't implement same interface for binary type of methods,
    + * and unify binary methods and string methods into one interface.
    + */
    +public interface RedisCommands {
    +    // binary, common
    +    Boolean exists(byte[] key);
    +
    +    Long del(byte[] key);
    +
    +    // binary, hash
    +    byte[] hget(byte[] key, byte[] field);
    +
    +    String hmset(byte[] key, Map<byte[], byte[]> fieldValues);
    +
    +    Map<byte[], byte[]> hgetAll(byte[] key);
    +
    +    Long hdel(byte[] key, byte[]... fields);
    +
    +    ScanResult<Map.Entry<byte[], byte[]>> hscan(byte[] key, byte[] cursor, ScanParams params);
    +
    +    // string, common
    +    boolean exists(String key);
    +
    +    // string, hash
    +    Map<String,String> hgetAll(String key);
    +
    +    String hmset(String key, Map<String, String> fieldValues);
    --- End diff --
    
    same as above


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1950: STORM-2369 [storm-redis] Use binary type for State...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1950#discussion_r123982153
  
    --- Diff: storm-core/src/jvm/org/apache/storm/state/BaseBinaryStateIterator.java ---
    @@ -0,0 +1,162 @@
    +package org.apache.storm.state;
    +
    +import com.google.common.collect.Iterators;
    +import com.google.common.collect.PeekingIterator;
    +import com.google.common.primitives.UnsignedBytes;
    +
    +import java.util.AbstractMap;
    +import java.util.Arrays;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.NoSuchElementException;
    +import java.util.Set;
    +import java.util.TreeSet;
    +
    +/**
    + * Base implementation of iterator over {@link KeyValueState} which is based on binary type.
    + */
    +public abstract class BaseBinaryStateIterator<K, V> implements Iterator<Map.Entry<K, V>> {
    +
    +  private final PeekingIterator<Map.Entry<byte[], byte[]>> pendingPrepareIterator;
    +  private final PeekingIterator<Map.Entry<byte[], byte[]>> pendingCommitIterator;
    +  private final Set<byte[]> providedKeys;
    +
    +  private boolean firstLoad = true;
    +  private PeekingIterator<Map.Entry<byte[], byte[]>> pendingIterator;
    +  private PeekingIterator<Map.Entry<byte[], byte[]>> cachedResultIterator;
    +
    +  /**
    +   * Constructor.
    +   *
    +   * @param pendingPrepareIterator The iterator of pendingPrepare
    +   * @param pendingCommitIterator The iterator of pendingCommit
    +   */
    +  public BaseBinaryStateIterator(Iterator<Map.Entry<byte[], byte[]>> pendingPrepareIterator,
    +      Iterator<Map.Entry<byte[], byte[]>> pendingCommitIterator) {
    +    this.pendingPrepareIterator = Iterators.peekingIterator(pendingPrepareIterator);
    +    this.pendingCommitIterator = Iterators.peekingIterator(pendingCommitIterator);
    +    this.providedKeys = new TreeSet<>(UnsignedBytes.lexicographicalComparator());
    +  }
    +
    +  @Override
    +  public boolean hasNext() {
    +    if (seekToAvailableEntry(pendingPrepareIterator)) {
    +      pendingIterator = pendingPrepareIterator;
    +      return true;
    +    }
    +
    +    if (seekToAvailableEntry(pendingCommitIterator)) {
    +      pendingIterator = pendingCommitIterator;
    +      return true;
    +    }
    +
    +
    +    if (firstLoad) {
    +      // load the first part of entries
    +      fillCachedResultIterator();
    +      firstLoad = false;
    +    }
    +
    +    while (true) {
    +      if (seekToAvailableEntry(cachedResultIterator)) {
    +        pendingIterator = cachedResultIterator;
    +        return true;
    +      }
    +
    +      if (isEndOfDataFromStorage()) {
    +        break;
    +      }
    +
    +      fillCachedResultIterator();
    +    }
    +
    +    pendingIterator = null;
    +    return false;
    +  }
    +
    +  private void fillCachedResultIterator() {
    +    Iterator<Map.Entry<byte[], byte[]>> iterator = loadChunkFromStateStorage();
    +    if (iterator != null) {
    +      cachedResultIterator = Iterators.peekingIterator(iterator);
    +    } else {
    +      cachedResultIterator = null;
    +    }
    +  }
    +
    +  @Override
    +  public Map.Entry<K, V> next() {
    +    if (!hasNext()) {
    +      throw new NoSuchElementException();
    +    }
    +
    +    Map.Entry<byte[], byte[]> keyValue = pendingIterator.next();
    +
    +    K key = decodeKey(keyValue.getKey());
    +    V value = decodeValue(keyValue.getValue());
    +
    +    providedKeys.add(keyValue.getKey());
    +    return new AbstractMap.SimpleEntry(key, value);
    +  }
    +
    +  @Override
    +  public void remove() {
    +    throw new UnsupportedOperationException();
    +  }
    +
    +  /**
    +   * Load some part of state KVs from storage and returns iterator of cached data from storage.
    +   *
    +   * @return Iterator of loaded state KVs
    +   */
    +  protected abstract Iterator<Map.Entry<byte[],byte[]>> loadChunkFromStateStorage();
    +
    +  /**
    +   * Check whether end of data is reached from storage state KVs.
    +   *
    +   * @return whether end of data is reached from storage state KVs
    +   */
    +  protected abstract boolean isEndOfDataFromStorage();
    +
    +  /**
    +   * Decode key to convert byte array to state key type.
    +   *
    +   * @param key byte array encoded key
    +   * @return Decoded value of key
    +   */
    +  protected abstract K decodeKey(byte[] key);
    +
    +  /**
    +   * Decode value to convert byte array to state value type.
    +   *
    +   * @param value byte array encoded value
    +   * @return Decoded value of value
    +   */
    +  protected abstract V decodeValue(byte[] value);
    --- End diff --
    
    Can we let the subclass deal with the specifics of decoding than assuming data is encoded as `byte[]`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1950: STORM-2369 [storm-redis] Use binary type for State...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1950#discussion_r109814710
  
    --- Diff: external/storm-redis/src/main/java/org/apache/storm/redis/common/commands/RedisCommands.java ---
    @@ -0,0 +1,56 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.redis.common.commands;
    +
    +import redis.clients.jedis.ScanParams;
    +import redis.clients.jedis.ScanResult;
    +
    +import java.util.Map;
    +
    +/**
    + * This interface represents Jedis methods exhaustively which are used on storm-redis.
    + *
    + * This is a workaround since Jedis and JedisCluster doesn't implement same interface for binary type of methods,
    + * and unify binary methods and string methods into one interface.
    + */
    +public interface RedisCommands {
    +    // binary, common
    +    Boolean exists(byte[] key);
    +
    +    Long del(byte[] key);
    +
    +    // binary, hash
    +    byte[] hget(byte[] key, byte[] field);
    +
    +    String hmset(byte[] key, Map<byte[], byte[]> fieldValues);
    +
    +    Map<byte[], byte[]> hgetAll(byte[] key);
    +
    +    Long hdel(byte[] key, byte[]... fields);
    +
    +    ScanResult<Map.Entry<byte[], byte[]>> hscan(byte[] key, byte[] cursor, ScanParams params);
    +
    +    // string, common
    +    boolean exists(String key);
    --- End diff --
    
    Good suggestion. I'll group them per command.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1950: STORM-2369 [storm-redis] Use binary type for State...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1950#discussion_r123982139
  
    --- Diff: storm-core/src/jvm/org/apache/storm/state/BaseBinaryStateIterator.java ---
    @@ -0,0 +1,162 @@
    +package org.apache.storm.state;
    +
    +import com.google.common.collect.Iterators;
    +import com.google.common.collect.PeekingIterator;
    +import com.google.common.primitives.UnsignedBytes;
    +
    +import java.util.AbstractMap;
    +import java.util.Arrays;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.NoSuchElementException;
    +import java.util.Set;
    +import java.util.TreeSet;
    +
    +/**
    + * Base implementation of iterator over {@link KeyValueState} which is based on binary type.
    + */
    +public abstract class BaseBinaryStateIterator<K, V> implements Iterator<Map.Entry<K, V>> {
    +
    +  private final PeekingIterator<Map.Entry<byte[], byte[]>> pendingPrepareIterator;
    +  private final PeekingIterator<Map.Entry<byte[], byte[]>> pendingCommitIterator;
    +  private final Set<byte[]> providedKeys;
    +
    +  private boolean firstLoad = true;
    +  private PeekingIterator<Map.Entry<byte[], byte[]>> pendingIterator;
    +  private PeekingIterator<Map.Entry<byte[], byte[]>> cachedResultIterator;
    +
    +  /**
    +   * Constructor.
    +   *
    +   * @param pendingPrepareIterator The iterator of pendingPrepare
    +   * @param pendingCommitIterator The iterator of pendingCommit
    +   */
    +  public BaseBinaryStateIterator(Iterator<Map.Entry<byte[], byte[]>> pendingPrepareIterator,
    +      Iterator<Map.Entry<byte[], byte[]>> pendingCommitIterator) {
    +    this.pendingPrepareIterator = Iterators.peekingIterator(pendingPrepareIterator);
    +    this.pendingCommitIterator = Iterators.peekingIterator(pendingCommitIterator);
    +    this.providedKeys = new TreeSet<>(UnsignedBytes.lexicographicalComparator());
    +  }
    +
    +  @Override
    +  public boolean hasNext() {
    +    if (seekToAvailableEntry(pendingPrepareIterator)) {
    +      pendingIterator = pendingPrepareIterator;
    +      return true;
    +    }
    +
    +    if (seekToAvailableEntry(pendingCommitIterator)) {
    +      pendingIterator = pendingCommitIterator;
    +      return true;
    +    }
    +
    +
    +    if (firstLoad) {
    +      // load the first part of entries
    +      fillCachedResultIterator();
    +      firstLoad = false;
    +    }
    +
    +    while (true) {
    +      if (seekToAvailableEntry(cachedResultIterator)) {
    +        pendingIterator = cachedResultIterator;
    +        return true;
    +      }
    +
    +      if (isEndOfDataFromStorage()) {
    +        break;
    +      }
    +
    +      fillCachedResultIterator();
    +    }
    +
    +    pendingIterator = null;
    +    return false;
    +  }
    +
    +  private void fillCachedResultIterator() {
    +    Iterator<Map.Entry<byte[], byte[]>> iterator = loadChunkFromStateStorage();
    +    if (iterator != null) {
    +      cachedResultIterator = Iterators.peekingIterator(iterator);
    +    } else {
    +      cachedResultIterator = null;
    +    }
    +  }
    +
    +  @Override
    +  public Map.Entry<K, V> next() {
    +    if (!hasNext()) {
    +      throw new NoSuchElementException();
    +    }
    +
    +    Map.Entry<byte[], byte[]> keyValue = pendingIterator.next();
    +
    +    K key = decodeKey(keyValue.getKey());
    +    V value = decodeValue(keyValue.getValue());
    +
    +    providedKeys.add(keyValue.getKey());
    +    return new AbstractMap.SimpleEntry(key, value);
    +  }
    +
    +  @Override
    +  public void remove() {
    +    throw new UnsupportedOperationException();
    +  }
    +
    +  /**
    +   * Load some part of state KVs from storage and returns iterator of cached data from storage.
    +   *
    +   * @return Iterator of loaded state KVs
    +   */
    +  protected abstract Iterator<Map.Entry<byte[],byte[]>> loadChunkFromStateStorage();
    +
    +  /**
    +   * Check whether end of data is reached from storage state KVs.
    +   *
    +   * @return whether end of data is reached from storage state KVs
    +   */
    +  protected abstract boolean isEndOfDataFromStorage();
    +
    +  /**
    +   * Decode key to convert byte array to state key type.
    +   *
    +   * @param key byte array encoded key
    +   * @return Decoded value of key
    +   */
    +  protected abstract K decodeKey(byte[] key);
    --- End diff --
    
    Can we let the subclass deal with the specifics of decoding than assuming data is encoded as `byte[]`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1950: STORM-2369 [storm-redis] Use binary type for State manage...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/1950
  
    Might need to provide migration tool to migrate Base64 encoded states to be conformed to current implementation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1950: STORM-2369 [storm-redis] Use binary type for State...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1950#discussion_r124484454
  
    --- Diff: external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueState.java ---
    @@ -316,10 +340,18 @@ private Long lastPreparedTxid() {
     
         private Long lastId(String key) {
             Long lastId = null;
    -        String str = txIds.get(key);
    -        if (str != null) {
    -            lastId = Long.valueOf(str);
    +        String txId = txIds.get(key);
    +        if (txId != null) {
    +            lastId = Long.valueOf(txId);
             }
             return lastId;
         }
    +
    +    private ConcurrentNavigableMap<byte[], byte[]> createPendingPrepareMap() {
    +        return new ConcurrentSkipListMap<>(UnsignedBytes.lexicographicalComparator());
    +    }
    +
    +    private NavigableMap<byte[], byte[]> createPendingCommitMap() {
    --- End diff --
    
    Seems this is not used. If so can be removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1950: STORM-2369 [storm-redis] Use binary type for State...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1950#discussion_r124188554
  
    --- Diff: storm-core/src/jvm/org/apache/storm/state/BaseBinaryStateIterator.java ---
    @@ -0,0 +1,162 @@
    +package org.apache.storm.state;
    +
    +import com.google.common.collect.Iterators;
    +import com.google.common.collect.PeekingIterator;
    +import com.google.common.primitives.UnsignedBytes;
    +
    +import java.util.AbstractMap;
    +import java.util.Arrays;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.NoSuchElementException;
    +import java.util.Set;
    +import java.util.TreeSet;
    +
    +/**
    + * Base implementation of iterator over {@link KeyValueState} which is based on binary type.
    + */
    +public abstract class BaseBinaryStateIterator<K, V> implements Iterator<Map.Entry<K, V>> {
    +
    +  private final PeekingIterator<Map.Entry<byte[], byte[]>> pendingPrepareIterator;
    +  private final PeekingIterator<Map.Entry<byte[], byte[]>> pendingCommitIterator;
    +  private final Set<byte[]> providedKeys;
    +
    +  private boolean firstLoad = true;
    +  private PeekingIterator<Map.Entry<byte[], byte[]>> pendingIterator;
    +  private PeekingIterator<Map.Entry<byte[], byte[]>> cachedResultIterator;
    +
    +  /**
    +   * Constructor.
    +   *
    +   * @param pendingPrepareIterator The iterator of pendingPrepare
    +   * @param pendingCommitIterator The iterator of pendingCommit
    +   */
    +  public BaseBinaryStateIterator(Iterator<Map.Entry<byte[], byte[]>> pendingPrepareIterator,
    +      Iterator<Map.Entry<byte[], byte[]>> pendingCommitIterator) {
    +    this.pendingPrepareIterator = Iterators.peekingIterator(pendingPrepareIterator);
    +    this.pendingCommitIterator = Iterators.peekingIterator(pendingCommitIterator);
    +    this.providedKeys = new TreeSet<>(UnsignedBytes.lexicographicalComparator());
    +  }
    +
    +  @Override
    +  public boolean hasNext() {
    +    if (seekToAvailableEntry(pendingPrepareIterator)) {
    +      pendingIterator = pendingPrepareIterator;
    +      return true;
    +    }
    +
    +    if (seekToAvailableEntry(pendingCommitIterator)) {
    +      pendingIterator = pendingCommitIterator;
    +      return true;
    +    }
    +
    +
    +    if (firstLoad) {
    +      // load the first part of entries
    +      fillCachedResultIterator();
    +      firstLoad = false;
    +    }
    +
    +    while (true) {
    +      if (seekToAvailableEntry(cachedResultIterator)) {
    +        pendingIterator = cachedResultIterator;
    +        return true;
    +      }
    +
    +      if (isEndOfDataFromStorage()) {
    +        break;
    +      }
    +
    +      fillCachedResultIterator();
    +    }
    +
    +    pendingIterator = null;
    +    return false;
    +  }
    +
    +  private void fillCachedResultIterator() {
    +    Iterator<Map.Entry<byte[], byte[]>> iterator = loadChunkFromStateStorage();
    +    if (iterator != null) {
    +      cachedResultIterator = Iterators.peekingIterator(iterator);
    +    } else {
    +      cachedResultIterator = null;
    +    }
    +  }
    +
    +  @Override
    +  public Map.Entry<K, V> next() {
    +    if (!hasNext()) {
    +      throw new NoSuchElementException();
    +    }
    +
    +    Map.Entry<byte[], byte[]> keyValue = pendingIterator.next();
    +
    +    K key = decodeKey(keyValue.getKey());
    +    V value = decodeValue(keyValue.getValue());
    +
    +    providedKeys.add(keyValue.getKey());
    +    return new AbstractMap.SimpleEntry(key, value);
    +  }
    +
    +  @Override
    +  public void remove() {
    +    throw new UnsupportedOperationException();
    +  }
    +
    +  /**
    +   * Load some part of state KVs from storage and returns iterator of cached data from storage.
    +   *
    +   * @return Iterator of loaded state KVs
    +   */
    +  protected abstract Iterator<Map.Entry<byte[],byte[]>> loadChunkFromStateStorage();
    +
    +  /**
    +   * Check whether end of data is reached from storage state KVs.
    +   *
    +   * @return whether end of data is reached from storage state KVs
    +   */
    +  protected abstract boolean isEndOfDataFromStorage();
    +
    +  /**
    +   * Decode key to convert byte array to state key type.
    +   *
    +   * @param key byte array encoded key
    +   * @return Decoded value of key
    +   */
    +  protected abstract K decodeKey(byte[] key);
    +
    +  /**
    +   * Decode value to convert byte array to state value type.
    +   *
    +   * @param value byte array encoded value
    +   * @return Decoded value of value
    +   */
    +  protected abstract V decodeValue(byte[] value);
    --- End diff --
    
    Same above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1950: STORM-2369 [storm-redis] Use binary type for State...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1950#discussion_r124291484
  
    --- Diff: docs/State-checkpointing.md ---
    @@ -70,6 +70,46 @@ json config with the following properties.
         }
     }
     ```
    + 
    +For Redis Cluster state this is a json config with the following properties.
    + 
    +```
    + {
    +   "keyClass": "Optional fully qualified class name of the Key type.",
    +   "valueClass": "Optional fully qualified class name of the Value type.",
    +   "keySerializerClass": "Optional Key serializer implementation class.",
    +   "valueSerializerClass": "Optional Value Serializer implementation class.",
    +   "jedisClusterConfig": {
    +     "nodes": ["localhost:7379", "localhost:7380", "localhost:7381"],
    +     "timeout": 2000,
    +     "maxRedirections": 5
    +   }
    + }
    +```
    +
    +NOTE: If you used Redis state with Storm version 1.1.0 or earlier, you would need to also migrate your state since the representation of state has changed  
    --- End diff --
    
    I'm trying to address this via adding `version` field to CheckpointState class, but stuck on migrating old checkpoint spout state to newer. Kyro doesn't support forward compatibility, and even seemed not be able to read with copied old class. I have no idea how to do it right now.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1950: STORM-2369 [storm-redis] Use binary type for State manage...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/1950
  
    @arunmahadevan 
    Sorry got really late for addressing review comments. 
    
    Addressed comments, and created migration tool `Base64ToBinaryStateMigrationUtil`.
    (Succeed to read State with this patch from migrated State.)
    
    I'll describe how to use to the doc soon.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1950: STORM-2369 [storm-redis] Use binary type for State manage...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on the issue:

    https://github.com/apache/storm/pull/1950
  
    @HeartSaVioR what is the motivation for using binary instead of string type ? Is this mainly for performance ? I will review this sometime next week.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1950: STORM-2369 [storm-redis] Use binary type for State manage...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/1950
  
    Rebased and updated state iterator implementation.
    Also extracted base implementation of binary based state iterator into storm-core which could be used also from HBase state.
    
    @arunmahadevan Please take a look when you have time. Thanks in advance.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1950: STORM-2369 [storm-redis] Use binary type for State manage...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/1950
  
    @arunmahadevan 
    Please take a look when you have some time. Thanks in advance!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1950: STORM-2369 [storm-redis] Use binary type for State...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1950#discussion_r124186710
  
    --- Diff: external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueState.java ---
    @@ -266,10 +288,10 @@ public void rollback() {
                     LOG.debug("hmset txidNamespace {}, txIds {}", txidNamespace, txIds);
                     commands.hmset(txidNamespace, txIds);
                 }
    -            pendingCommit = Collections.emptyMap();
    -            pendingPrepare = new ConcurrentHashMap<>();
    +            pendingCommit = createPendingCommitMap();
    --- End diff --
    
    Will address.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1950: STORM-2369 [storm-redis] Use binary type for State...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1950#discussion_r123979220
  
    --- Diff: external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueState.java ---
    @@ -64,75 +75,83 @@ public RedisKeyValueState(String namespace, JedisPoolConfig poolConfig) {
         }
     
         public RedisKeyValueState(String namespace, JedisPoolConfig poolConfig, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
    -        this(namespace, JedisCommandsContainerBuilder.build(poolConfig), keySerializer, valueSerializer);
    +        this(namespace, RedisCommandsContainerBuilder.build(poolConfig), keySerializer, valueSerializer);
         }
     
    -    public RedisKeyValueState(String namespace, JedisCommandsInstanceContainer jedisContainer,
    +    public RedisKeyValueState(String namespace, JedisClusterConfig jedisClusterConfig, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
    +        this(namespace, RedisCommandsContainerBuilder.build(jedisClusterConfig), keySerializer, valueSerializer);
    +    }
    +
    +    public RedisKeyValueState(String namespace, RedisCommandsInstanceContainer container,
                                   Serializer<K> keySerializer, Serializer<V> valueSerializer) {
    -        this.namespace = namespace;
    -        this.prepareNamespace = namespace + "$prepare";
    +        this.namespace = SafeEncoder.encode(namespace);
    +        this.prepareNamespace = SafeEncoder.encode(namespace + "$prepare");
             this.txidNamespace = namespace + "$txid";
    -        this.encoder = new RedisEncoder<K, V>(keySerializer, valueSerializer);
    -        this.jedisContainer = jedisContainer;
    -        this.pendingPrepare = new ConcurrentHashMap<>();
    +        this.encoder = new DefaultStateEncoder<K, V>(keySerializer, valueSerializer);
    +        this.container = container;
    +        this.pendingPrepare = createPendingPrepareMap();
             initTxids();
             initPendingCommit();
         }
     
         private void initTxids() {
    -        JedisCommands commands = null;
    +        RedisCommands commands = null;
             try {
    -            commands = jedisContainer.getInstance();
    +            commands = container.getInstance();
                 if (commands.exists(txidNamespace)) {
                     txIds = commands.hgetAll(txidNamespace);
                 } else {
                     txIds = new HashMap<>();
                 }
                 LOG.debug("initTxids, txIds {}", txIds);
             } finally {
    -            jedisContainer.returnInstance(commands);
    +            container.returnInstance(commands);
             }
         }
     
         private void initPendingCommit() {
    -        JedisCommands commands = null;
    +        RedisCommands commands = null;
             try {
    -            commands = jedisContainer.getInstance();
    +            commands = container.getInstance();
                 if (commands.exists(prepareNamespace)) {
                     LOG.debug("Loading previously prepared commit from {}", prepareNamespace);
    -                pendingCommit = Collections.unmodifiableMap(commands.hgetAll(prepareNamespace));
    +                NavigableMap<byte[], byte[]> pendingCommitMap = createPendingCommitMap();
    +                pendingCommitMap.putAll(commands.hgetAll(prepareNamespace));
    +                pendingCommit = Maps.unmodifiableNavigableMap(pendingCommitMap);
                 } else {
                     LOG.debug("No previously prepared commits.");
    -                pendingCommit = Collections.emptyMap();
    +                pendingCommit = createPendingCommitMap();
    --- End diff --
    
    nit: can be `Collections.emptyNavigableMap()` (java8) or a static final variable initialized with an empty TreeMap


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1950: STORM-2369 [storm-redis] Use binary type for State...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1950#discussion_r124188533
  
    --- Diff: storm-core/src/jvm/org/apache/storm/state/BaseBinaryStateIterator.java ---
    @@ -0,0 +1,162 @@
    +package org.apache.storm.state;
    +
    +import com.google.common.collect.Iterators;
    +import com.google.common.collect.PeekingIterator;
    +import com.google.common.primitives.UnsignedBytes;
    +
    +import java.util.AbstractMap;
    +import java.util.Arrays;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.NoSuchElementException;
    +import java.util.Set;
    +import java.util.TreeSet;
    +
    +/**
    + * Base implementation of iterator over {@link KeyValueState} which is based on binary type.
    + */
    +public abstract class BaseBinaryStateIterator<K, V> implements Iterator<Map.Entry<K, V>> {
    +
    +  private final PeekingIterator<Map.Entry<byte[], byte[]>> pendingPrepareIterator;
    +  private final PeekingIterator<Map.Entry<byte[], byte[]>> pendingCommitIterator;
    +  private final Set<byte[]> providedKeys;
    +
    +  private boolean firstLoad = true;
    +  private PeekingIterator<Map.Entry<byte[], byte[]>> pendingIterator;
    +  private PeekingIterator<Map.Entry<byte[], byte[]>> cachedResultIterator;
    +
    +  /**
    +   * Constructor.
    +   *
    +   * @param pendingPrepareIterator The iterator of pendingPrepare
    +   * @param pendingCommitIterator The iterator of pendingCommit
    +   */
    +  public BaseBinaryStateIterator(Iterator<Map.Entry<byte[], byte[]>> pendingPrepareIterator,
    +      Iterator<Map.Entry<byte[], byte[]>> pendingCommitIterator) {
    +    this.pendingPrepareIterator = Iterators.peekingIterator(pendingPrepareIterator);
    +    this.pendingCommitIterator = Iterators.peekingIterator(pendingCommitIterator);
    +    this.providedKeys = new TreeSet<>(UnsignedBytes.lexicographicalComparator());
    +  }
    +
    +  @Override
    +  public boolean hasNext() {
    +    if (seekToAvailableEntry(pendingPrepareIterator)) {
    +      pendingIterator = pendingPrepareIterator;
    +      return true;
    +    }
    +
    +    if (seekToAvailableEntry(pendingCommitIterator)) {
    +      pendingIterator = pendingCommitIterator;
    +      return true;
    +    }
    +
    +
    +    if (firstLoad) {
    +      // load the first part of entries
    +      fillCachedResultIterator();
    +      firstLoad = false;
    +    }
    +
    +    while (true) {
    +      if (seekToAvailableEntry(cachedResultIterator)) {
    +        pendingIterator = cachedResultIterator;
    +        return true;
    +      }
    +
    +      if (isEndOfDataFromStorage()) {
    +        break;
    +      }
    +
    +      fillCachedResultIterator();
    +    }
    +
    +    pendingIterator = null;
    +    return false;
    +  }
    +
    +  private void fillCachedResultIterator() {
    +    Iterator<Map.Entry<byte[], byte[]>> iterator = loadChunkFromStateStorage();
    +    if (iterator != null) {
    +      cachedResultIterator = Iterators.peekingIterator(iterator);
    +    } else {
    +      cachedResultIterator = null;
    +    }
    +  }
    +
    +  @Override
    +  public Map.Entry<K, V> next() {
    +    if (!hasNext()) {
    +      throw new NoSuchElementException();
    +    }
    +
    +    Map.Entry<byte[], byte[]> keyValue = pendingIterator.next();
    +
    +    K key = decodeKey(keyValue.getKey());
    +    V value = decodeValue(keyValue.getValue());
    +
    +    providedKeys.add(keyValue.getKey());
    +    return new AbstractMap.SimpleEntry(key, value);
    +  }
    +
    +  @Override
    +  public void remove() {
    +    throw new UnsupportedOperationException();
    +  }
    +
    +  /**
    +   * Load some part of state KVs from storage and returns iterator of cached data from storage.
    +   *
    +   * @return Iterator of loaded state KVs
    +   */
    +  protected abstract Iterator<Map.Entry<byte[],byte[]>> loadChunkFromStateStorage();
    --- End diff --
    
    The type of pendingPrepare and pendingCommit, and storage should be same, so the type is bound to also constructor parameters. So in order to make this generic, we need to have two additional types for storage raw type. I'll see how it will be changed when it is generalized.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1950: STORM-2369 [storm-redis] Use binary type for State...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1950#discussion_r124218323
  
    --- Diff: storm-core/src/jvm/org/apache/storm/state/DefaultStateEncoder.java ---
    @@ -15,27 +15,23 @@
      * See the License for the specific language governing permissions and
      * limitations under the License.
      */
    -package org.apache.storm.redis.utils;
    +package org.apache.storm.state;
     
     import com.google.common.base.Optional;
     
    -import org.apache.commons.codec.binary.Base64;
    -import org.apache.storm.state.DefaultStateSerializer;
    -import org.apache.storm.state.Serializer;
    -
     /**
    - * Helper class for encoding/decoding redis key values.
    + * Helper class for encoding/decoding key values.
      */
    -public class RedisEncoder<K, V> {
    +public class DefaultStateEncoder<K, V> {
    --- End diff --
    
    HBase uses `byte[]` as raw type so we could use the same encoder to HBase State, too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1950: STORM-2369 [storm-redis] Use binary type for State...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1950#discussion_r124186699
  
    --- Diff: external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueState.java ---
    @@ -187,70 +206,73 @@ public void prepareCommit(long txid) {
                     LOG.debug("Nothing to save for prepareCommit, txid {}.", txid);
                 }
                 txIds.put(PREPARE_TXID_KEY, String.valueOf(txid));
    +
                 commands.hmset(txidNamespace, txIds);
    -            pendingCommit = Collections.unmodifiableMap(currentPending);
    +            pendingCommit = Maps.unmodifiableNavigableMap(currentPending);
             } finally {
    -            jedisContainer.returnInstance(commands);
    +            container.returnInstance(commands);
             }
         }
     
         @Override
         public void commit(long txid) {
             LOG.debug("commit txid {}", txid);
             validateCommitTxid(txid);
    -        JedisCommands commands = null;
    +        RedisCommands commands = null;
             try {
    -            commands = jedisContainer.getInstance();
    +            commands = container.getInstance();
                 if (!pendingCommit.isEmpty()) {
    -                List<String> keysToDelete = new ArrayList<>();
    -                Map<String, String> keysToAdd = new HashMap<>();
    -                for(Map.Entry<String, String> entry: pendingCommit.entrySet()) {
    -                    if (RedisEncoder.TOMBSTONE.equals(entry.getValue())) {
    -                        keysToDelete.add(entry.getKey());
    +                List<byte[]> keysToDelete = new ArrayList<>();
    +                Map<byte[], byte[]> keysToAdd = new HashMap<>();
    +                for(Map.Entry<byte[], byte[]> entry: pendingCommit.entrySet()) {
    +                    byte[] key = entry.getKey();
    +                    byte[] value = entry.getValue();
    +                    if (Arrays.equals(DefaultStateEncoder.TOMBSTONE, value)) {
    +                        keysToDelete.add(key);
                         } else {
    -                        keysToAdd.put(entry.getKey(), entry.getValue());
    +                        keysToAdd.put(key, value);
                         }
                     }
                     if (!keysToAdd.isEmpty()) {
                         commands.hmset(namespace, keysToAdd);
                     }
                     if (!keysToDelete.isEmpty()) {
    -                    commands.hdel(namespace, keysToDelete.toArray(new String[0]));
    +                    commands.hdel(namespace, keysToDelete.toArray(new byte[0][]));
                     }
                 } else {
                     LOG.debug("Nothing to save for commit, txid {}.", txid);
                 }
                 txIds.put(COMMIT_TXID_KEY, String.valueOf(txid));
                 commands.hmset(txidNamespace, txIds);
                 commands.del(prepareNamespace);
    -            pendingCommit = Collections.emptyMap();
    +            pendingCommit = createPendingCommitMap();
    --- End diff --
    
    Will address.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1950: STORM-2369 [storm-redis] Use binary type for State...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1950#discussion_r109653504
  
    --- Diff: external/storm-redis/src/main/java/org/apache/storm/redis/common/commands/RedisCommands.java ---
    @@ -0,0 +1,56 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.redis.common.commands;
    +
    +import redis.clients.jedis.ScanParams;
    +import redis.clients.jedis.ScanResult;
    +
    +import java.util.Map;
    +
    +/**
    + * This interface represents Jedis methods exhaustively which are used on storm-redis.
    + *
    + * This is a workaround since Jedis and JedisCluster doesn't implement same interface for binary type of methods,
    + * and unify binary methods and string methods into one interface.
    + */
    +public interface RedisCommands {
    +    // binary, common
    +    Boolean exists(byte[] key);
    +
    +    Long del(byte[] key);
    +
    +    // binary, hash
    +    byte[] hget(byte[] key, byte[] field);
    +
    +    String hmset(byte[] key, Map<byte[], byte[]> fieldValues);
    +
    +    Map<byte[], byte[]> hgetAll(byte[] key);
    +
    +    Long hdel(byte[] key, byte[]... fields);
    +
    +    ScanResult<Map.Entry<byte[], byte[]>> hscan(byte[] key, byte[] cursor, ScanParams params);
    +
    +    // string, common
    +    boolean exists(String key);
    +
    +    // string, hash
    +    Map<String,String> hgetAll(String key);
    --- End diff --
    
    same as above


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1950: STORM-2369 [storm-redis] Use binary type for State manage...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/1950
  
    Agreed to leave a note regarding that, and I'm OK to provide a tool to convert current state since state KVs would be valuable for users. I'll try to handle that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1950: STORM-2369 [storm-redis] Use binary type for State...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1950#discussion_r124186379
  
    --- Diff: storm-core/src/jvm/org/apache/storm/state/DefaultStateEncoder.java ---
    @@ -15,27 +15,23 @@
      * See the License for the specific language governing permissions and
      * limitations under the License.
      */
    -package org.apache.storm.redis.utils;
    +package org.apache.storm.state;
     
     import com.google.common.base.Optional;
     
    -import org.apache.commons.codec.binary.Base64;
    -import org.apache.storm.state.DefaultStateSerializer;
    -import org.apache.storm.state.Serializer;
    -
     /**
    - * Helper class for encoding/decoding redis key values.
    + * Helper class for encoding/decoding key values.
      */
    -public class RedisEncoder<K, V> {
    +public class DefaultStateEncoder<K, V> {
    --- End diff --
    
    We're already exposing Serializer for key and value, so not sure we want to have another interface and implementations here. Do we want to have encoder for storage which doesn't support binary?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1950: STORM-2369 [storm-redis] Use binary type for State...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1950#discussion_r123981715
  
    --- Diff: storm-core/src/jvm/org/apache/storm/state/BaseBinaryStateIterator.java ---
    @@ -0,0 +1,162 @@
    +package org.apache.storm.state;
    +
    +import com.google.common.collect.Iterators;
    +import com.google.common.collect.PeekingIterator;
    +import com.google.common.primitives.UnsignedBytes;
    +
    +import java.util.AbstractMap;
    +import java.util.Arrays;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.NoSuchElementException;
    +import java.util.Set;
    +import java.util.TreeSet;
    +
    +/**
    + * Base implementation of iterator over {@link KeyValueState} which is based on binary type.
    + */
    +public abstract class BaseBinaryStateIterator<K, V> implements Iterator<Map.Entry<K, V>> {
    +
    +  private final PeekingIterator<Map.Entry<byte[], byte[]>> pendingPrepareIterator;
    +  private final PeekingIterator<Map.Entry<byte[], byte[]>> pendingCommitIterator;
    +  private final Set<byte[]> providedKeys;
    +
    +  private boolean firstLoad = true;
    +  private PeekingIterator<Map.Entry<byte[], byte[]>> pendingIterator;
    +  private PeekingIterator<Map.Entry<byte[], byte[]>> cachedResultIterator;
    +
    +  /**
    +   * Constructor.
    +   *
    +   * @param pendingPrepareIterator The iterator of pendingPrepare
    +   * @param pendingCommitIterator The iterator of pendingCommit
    +   */
    +  public BaseBinaryStateIterator(Iterator<Map.Entry<byte[], byte[]>> pendingPrepareIterator,
    +      Iterator<Map.Entry<byte[], byte[]>> pendingCommitIterator) {
    +    this.pendingPrepareIterator = Iterators.peekingIterator(pendingPrepareIterator);
    +    this.pendingCommitIterator = Iterators.peekingIterator(pendingCommitIterator);
    +    this.providedKeys = new TreeSet<>(UnsignedBytes.lexicographicalComparator());
    +  }
    +
    +  @Override
    +  public boolean hasNext() {
    +    if (seekToAvailableEntry(pendingPrepareIterator)) {
    +      pendingIterator = pendingPrepareIterator;
    +      return true;
    +    }
    +
    +    if (seekToAvailableEntry(pendingCommitIterator)) {
    +      pendingIterator = pendingCommitIterator;
    +      return true;
    +    }
    +
    +
    +    if (firstLoad) {
    +      // load the first part of entries
    +      fillCachedResultIterator();
    +      firstLoad = false;
    +    }
    +
    +    while (true) {
    +      if (seekToAvailableEntry(cachedResultIterator)) {
    +        pendingIterator = cachedResultIterator;
    +        return true;
    +      }
    +
    +      if (isEndOfDataFromStorage()) {
    +        break;
    +      }
    +
    +      fillCachedResultIterator();
    +    }
    +
    +    pendingIterator = null;
    +    return false;
    +  }
    +
    +  private void fillCachedResultIterator() {
    +    Iterator<Map.Entry<byte[], byte[]>> iterator = loadChunkFromStateStorage();
    +    if (iterator != null) {
    +      cachedResultIterator = Iterators.peekingIterator(iterator);
    +    } else {
    +      cachedResultIterator = null;
    +    }
    +  }
    +
    +  @Override
    +  public Map.Entry<K, V> next() {
    +    if (!hasNext()) {
    +      throw new NoSuchElementException();
    +    }
    +
    +    Map.Entry<byte[], byte[]> keyValue = pendingIterator.next();
    +
    +    K key = decodeKey(keyValue.getKey());
    +    V value = decodeValue(keyValue.getValue());
    +
    +    providedKeys.add(keyValue.getKey());
    +    return new AbstractMap.SimpleEntry(key, value);
    +  }
    +
    +  @Override
    +  public void remove() {
    +    throw new UnsupportedOperationException();
    +  }
    +
    +  /**
    +   * Load some part of state KVs from storage and returns iterator of cached data from storage.
    +   *
    +   * @return Iterator of loaded state KVs
    +   */
    +  protected abstract Iterator<Map.Entry<byte[],byte[]>> loadChunkFromStateStorage();
    --- End diff --
    
    Here it assumes the state implementations store in binary format? Can't it return `Iterator<Map.Entry<K, V>>` instead and not assume any specific implementation and let sub-classes do the decoding ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1950: STORM-2369 [storm-redis] Use binary type for State manage...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/1950
  
    Tested storm-starter's StatefulTopology with Redis Cluster nodes manually.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1950: STORM-2369 [storm-redis] Use binary type for State...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1950#discussion_r109814880
  
    --- Diff: external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueState.java ---
    @@ -152,70 +170,72 @@ public V get(K key, V defaultValue) {
         @Override
         public V delete(K key) {
             LOG.debug("delete key '{}'", key);
    -        String redisKey = encoder.encodeKey(key);
    +        byte[] redisKey = encoder.encodeKey(key);
             V curr = get(key);
    -        pendingPrepare.put(redisKey, RedisEncoder.TOMBSTONE);
    +        pendingPrepare.put(new ByteArrayWrapper(redisKey), RedisEncoder.TOMBSTONE);
             return curr;
         }
     
         @Override
         public Iterator<Map.Entry<K, V>> iterator() {
    -        return new RedisKeyValueStateIterator<K, V>(namespace, jedisContainer, pendingPrepare.entrySet().iterator(), pendingCommit.entrySet().iterator(),
    +        return new RedisKeyValueStateIterator<K, V>(namespace, container, pendingPrepare.entrySet().iterator(), pendingCommit.entrySet().iterator(),
                     ITERATOR_CHUNK_SIZE, encoder.getKeySerializer(), encoder.getValueSerializer());
         }
     
         @Override
         public void prepareCommit(long txid) {
             LOG.debug("prepareCommit txid {}", txid);
             validatePrepareTxid(txid);
    -        JedisCommands commands = null;
    +        RedisCommands commands = null;
             try {
    -            Map<String, String> currentPending = pendingPrepare;
    -            pendingPrepare = new ConcurrentHashMap<>();
    -            commands = jedisContainer.getInstance();
    +            Map<ByteArrayWrapper, byte[]> currentPending = pendingPrepare;
    +            pendingPrepare = createPendingPrepareMap();
    +            commands = container.getInstance();
                 if (commands.exists(prepareNamespace)) {
                     LOG.debug("Prepared txn already exists, will merge", txid);
    -                for (Map.Entry<String, String> e: pendingCommit.entrySet()) {
    +                for (Map.Entry<ByteArrayWrapper, byte[]> e: pendingCommit.entrySet()) {
                         if (!currentPending.containsKey(e.getKey())) {
                             currentPending.put(e.getKey(), e.getValue());
                         }
                     }
                 }
                 if (!currentPending.isEmpty()) {
    -                commands.hmset(prepareNamespace, currentPending);
    +                commands.hmset(prepareNamespace, ByteArrayUtil.Maps.newHashMapUnwrappingKey(currentPending));
    --- End diff --
    
    Great suggestion. I thought it is bad but didn't know about alternatives. I'll try to replace that one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1950: STORM-2369 [storm-redis] Use binary type for State manage...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on the issue:

    https://github.com/apache/storm/pull/1950
  
    +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1950: STORM-2369 [storm-redis] Use binary type for State...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1950#discussion_r123976213
  
    --- Diff: storm-core/src/jvm/org/apache/storm/state/DefaultStateEncoder.java ---
    @@ -15,27 +15,23 @@
      * See the License for the specific language governing permissions and
      * limitations under the License.
      */
    -package org.apache.storm.redis.utils;
    +package org.apache.storm.state;
     
     import com.google.common.base.Optional;
     
    -import org.apache.commons.codec.binary.Base64;
    -import org.apache.storm.state.DefaultStateSerializer;
    -import org.apache.storm.state.Serializer;
    -
     /**
    - * Helper class for encoding/decoding redis key values.
    + * Helper class for encoding/decoding key values.
      */
    -public class RedisEncoder<K, V> {
    +public class DefaultStateEncoder<K, V> {
    --- End diff --
    
    Maybe this can the default implementation of a StateEncoder interface?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1950: STORM-2369 [storm-redis] Use binary type for State manage...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/1950
  
    @arunmahadevan Thanks I've addressed latest review comment and also squashed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1950: STORM-2369 [storm-redis] Use binary type for State manage...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/1950
  
    @arunmahadevan Thanks for the further review comments. I addressed your comments without versioning state.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1950: STORM-2369 [storm-redis] Use binary type for State manage...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/1950
  
    @arunmahadevan Ah, please review #2172 as well when you revisit this. Thanks in advance!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1950: STORM-2369 [storm-redis] Use binary type for State manage...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/1950
  
    @arunmahadevan 
    We could save 
    - cost of serde. on KV pair
    - another serde. for String to byte[] while sending/receiving from/to Redis
    - traffic/space of Redis since base64 converts KV to much longer


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1950: STORM-2369 [storm-redis] Use binary type for State...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1950#discussion_r109653347
  
    --- Diff: external/storm-redis/src/main/java/org/apache/storm/redis/common/commands/RedisCommands.java ---
    @@ -0,0 +1,56 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.redis.common.commands;
    +
    +import redis.clients.jedis.ScanParams;
    +import redis.clients.jedis.ScanResult;
    +
    +import java.util.Map;
    +
    +/**
    + * This interface represents Jedis methods exhaustively which are used on storm-redis.
    + *
    + * This is a workaround since Jedis and JedisCluster doesn't implement same interface for binary type of methods,
    + * and unify binary methods and string methods into one interface.
    + */
    +public interface RedisCommands {
    +    // binary, common
    +    Boolean exists(byte[] key);
    +
    +    Long del(byte[] key);
    +
    +    // binary, hash
    +    byte[] hget(byte[] key, byte[] field);
    +
    +    String hmset(byte[] key, Map<byte[], byte[]> fieldValues);
    +
    +    Map<byte[], byte[]> hgetAll(byte[] key);
    +
    +    Long hdel(byte[] key, byte[]... fields);
    +
    +    ScanResult<Map.Entry<byte[], byte[]>> hscan(byte[] key, byte[] cursor, ScanParams params);
    +
    +    // string, common
    +    boolean exists(String key);
    --- End diff --
    
    Nit: can keep overloaded methods together. will make it easier to read and see which methods are overridden.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1950: STORM-2369 [storm-redis] Use binary type for State...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1950#discussion_r123957706
  
    --- Diff: docs/State-checkpointing.md ---
    @@ -70,6 +70,46 @@ json config with the following properties.
         }
     }
     ```
    + 
    +For Redis Cluster state this is a json config with the following properties.
    + 
    +```
    + {
    +   "keyClass": "Optional fully qualified class name of the Key type.",
    +   "valueClass": "Optional fully qualified class name of the Value type.",
    +   "keySerializerClass": "Optional Key serializer implementation class.",
    +   "valueSerializerClass": "Optional Value Serializer implementation class.",
    +   "jedisClusterConfig": {
    +     "nodes": ["localhost:7379", "localhost:7380", "localhost:7381"],
    +     "timeout": 2000,
    +     "maxRedirections": 5
    +   }
    + }
    +```
    +
    +NOTE: If you used Redis state with Storm version 1.1.0 or earlier, you would need to also migrate your state since the representation of state has changed  
    --- End diff --
    
    In case the user runs with the old state, do you throw any exceptions to let the user know that they need to migrate ? Should we add some state version and maybe store it along with the checkpoint spout state to handle this better in future?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1950: STORM-2369 [storm-redis] Use binary type for State...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1950#discussion_r124187943
  
    --- Diff: storm-core/src/jvm/org/apache/storm/state/DefaultStateEncoder.java ---
    @@ -15,27 +15,23 @@
      * See the License for the specific language governing permissions and
      * limitations under the License.
      */
    -package org.apache.storm.redis.utils;
    +package org.apache.storm.state;
     
     import com.google.common.base.Optional;
     
    -import org.apache.commons.codec.binary.Base64;
    -import org.apache.storm.state.DefaultStateSerializer;
    -import org.apache.storm.state.Serializer;
    -
     /**
    - * Helper class for encoding/decoding redis key values.
    + * Helper class for encoding/decoding key values.
      */
    -public class RedisEncoder<K, V> {
    +public class DefaultStateEncoder<K, V> {
    --- End diff --
    
    Was thinking the `byte[]` is specific to Redis. Do we expect to encode in similar format for other implementations as well? Maybe we can refactor it later if we need different formats for other implementations.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---