You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ar...@apache.org on 2016/06/09 05:46:38 UTC

[1/3] storm git commit: STORM-1873 Configurable behaviour for late tuples

Repository: storm
Updated Branches:
  refs/heads/master 2c77a2001 -> 04e4fdea0


STORM-1873 Configurable behaviour for late tuples


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/eccda116
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/eccda116
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/eccda116

Branch: refs/heads/master
Commit: eccda11615d0a5089d58754c9390371155c4e012
Parents: a9ef86d
Author: Balazs Kossovics <ba...@s4m.io>
Authored: Thu Jun 2 17:43:10 2016 +0200
Committer: Balazs Kossovics <ba...@s4m.io>
Committed: Wed Jun 8 14:05:01 2016 +0200

----------------------------------------------------------------------
 docs/Windowing.md                               | 24 +++++-
 .../clj/org/apache/storm/daemon/executor.clj    |  1 +
 storm-core/src/jvm/org/apache/storm/Config.java |  8 ++
 .../storm/topology/WindowedBoltExecutor.java    | 25 +++++-
 .../topology/base/BaseStatefulWindowedBolt.java |  8 ++
 .../storm/topology/base/BaseWindowedBolt.java   | 14 ++++
 .../topology/WindowedBoltExecutorTest.java      | 87 +++++++++++++++++++-
 7 files changed, 161 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/eccda116/docs/Windowing.md
----------------------------------------------------------------------
diff --git a/docs/Windowing.md b/docs/Windowing.md
index 9f5869f..82212ea 100644
--- a/docs/Windowing.md
+++ b/docs/Windowing.md
@@ -149,10 +149,6 @@ The value for the above `fieldName` will be looked up from the incoming tuple an
 If the field is not present in the tuple an exception will be thrown. Along with the timestamp field name, a time lag parameter 
 can also be specified which indicates the max time limit for tuples with out of order timestamps. 
 
-E.g. If the lag is 5 secs and a tuple `t1` arrived with timestamp `06:00:05` no tuples may arrive with tuple timestamp earlier than `06:00:00`. If a tuple
-arrives with timestamp 05:59:59 after `t1` and the window has moved past `t1`, it will be treated as a late tuple and not processed. Currently the late
- tuples are just logged in the worker log files at INFO level.
-
 ```java
 /**
 * Specify the maximum time lag of the tuple timestamp in milliseconds. It means that the tuple timestamps
@@ -163,6 +159,26 @@ arrives with timestamp 05:59:59 after `t1` and the window has moved past `t1`, i
 public BaseWindowedBolt withLag(Duration duration)
 ```
 
+E.g. If the lag is 5 secs and a tuple `t1` arrived with timestamp `06:00:05` no tuples may arrive with tuple timestamp earlier than `06:00:00`. If a tuple
+arrives with timestamp 05:59:59 after `t1` and the window has moved past `t1`, it will be treated as a late tuple. Late tuples are not processed by default,
+just logged in the worker log files at INFO level.
+
+```java
+/**
+ * Specify a stream id on which late tuples are going to be emitted. They are going to be accessible via the
+ * {@link org.apache.storm.topology.WindowedBoltExecutor#LATE_TUPLE_FIELD} field.
+ * It must be defined on a per-component basis, and in conjunction with the
+ * {@link BaseWindowedBolt#withTimestampField}, otherwise {@link IllegalArgumentException} will be thrown.
+ *
+ * @param streamId the name of the stream used to emit late tuples on
+ */
+public BaseWindowedBolt withLateTupleStream(String streamId)
+
+```
+This behaviour can be changed by specifying the above `streamId`. In this case late tuples are going to be emitted on the specified stream and accessible
+via the field `WindowedBoltExecutor.LATE_TUPLE_FIELD`.
+
+
 ### Watermarks
 For processing tuples with timestamp field, storm internally computes watermarks based on the incoming tuple timestamp. Watermark is 
 the minimum of the latest tuple timestamps (minus the lag) across all the input streams. At a higher level this is similar to the watermark concept

http://git-wip-us.apache.org/repos/asf/storm/blob/eccda116/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
index d960ba3..6c94321 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
@@ -112,6 +112,7 @@
                         TOPOLOGY-BOLTS-SLIDING-INTERVAL-COUNT
                         TOPOLOGY-BOLTS-SLIDING-INTERVAL-DURATION-MS
                         TOPOLOGY-BOLTS-TUPLE-TIMESTAMP-FIELD-NAME
+                        TOPOLOGY-BOLTS-LATE-TUPLE-STREAM
                         TOPOLOGY-BOLTS-TUPLE-TIMESTAMP-MAX-LAG-MS
                         TOPOLOGY-BOLTS-MESSAGE-ID-FIELD-NAME
                         TOPOLOGY-STATE-PROVIDER

http://git-wip-us.apache.org/repos/asf/storm/blob/eccda116/storm-core/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/Config.java b/storm-core/src/jvm/org/apache/storm/Config.java
index 73809f1..c03669b 100644
--- a/storm-core/src/jvm/org/apache/storm/Config.java
+++ b/storm-core/src/jvm/org/apache/storm/Config.java
@@ -1872,6 +1872,14 @@ public class Config extends HashMap<String, Object> {
     @isString
     public static final String TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_FIELD_NAME = "topology.bolts.tuple.timestamp.field.name";
 
+    /**
+     * Bolt-specific configuration for windowed bolts to specify the name of the stream on which late tuples are
+     * going to be emitted. This configuration should only be used from the BaseWindowedBolt.withLateTupleStream builder
+     * method, and not as global parameter, otherwise IllegalArgumentException is going to be thrown.
+     */
+    @isString
+    public static final String TOPOLOGY_BOLTS_LATE_TUPLE_STREAM = "topology.bolts.late.tuple.stream";
+
     /*
      * Bolt-specific configuration for windowed bolts to specify the maximum time lag of the tuple timestamp
      * in milliseconds. It means that the tuple timestamps cannot be out of order by more than this amount.

http://git-wip-us.apache.org/repos/asf/storm/blob/eccda116/storm-core/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java b/storm-core/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java
index 9439ef2..fe0f4da 100644
--- a/storm-core/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java
+++ b/storm-core/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java
@@ -23,7 +23,9 @@ import org.apache.storm.spout.CheckpointSpout;
 import org.apache.storm.task.IOutputCollector;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
 import org.apache.storm.windowing.CountEvictionPolicy;
 import org.apache.storm.windowing.CountTriggerPolicy;
 import org.apache.storm.windowing.EvictionPolicy;
@@ -57,12 +59,14 @@ public class WindowedBoltExecutor implements IRichBolt {
     private static final Logger LOG = LoggerFactory.getLogger(WindowedBoltExecutor.class);
     private static final int DEFAULT_WATERMARK_EVENT_INTERVAL_MS = 1000; // 1s
     private static final int DEFAULT_MAX_LAG_MS = 0; // no lag
+    public static final String LATE_TUPLE_FIELD = "late_tuple";
     private final IWindowedBolt bolt;
     private transient WindowedOutputCollector windowedOutputCollector;
     private transient WindowLifecycleListener<Tuple> listener;
     private transient WindowManager<Tuple> windowManager;
     private transient int maxLagMs;
     private transient String tupleTsFieldName;
+    private transient String lateTupleStream;
     private transient TriggerPolicy<Tuple> triggerPolicy;
     private transient EvictionPolicy<Tuple> evictionPolicy;
     // package level for unit tests
@@ -163,6 +167,13 @@ public class WindowedBoltExecutor implements IRichBolt {
         // tuple ts
         if (stormConf.containsKey(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_FIELD_NAME)) {
             tupleTsFieldName = (String) stormConf.get(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_FIELD_NAME);
+            // late tuple stream
+            lateTupleStream = (String) stormConf.get(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM);
+            if (lateTupleStream != null) {
+                if (!context.getThisStreams().contains(lateTupleStream)) {
+                    throw new IllegalArgumentException("Stream for late tuples must be defined with the builder method withLateTupleStream");
+                }
+            }
             // max lag
             if (stormConf.containsKey(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS)) {
                 maxLagMs = ((Number) stormConf.get(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS)).intValue();
@@ -178,6 +189,10 @@ public class WindowedBoltExecutor implements IRichBolt {
             }
             waterMarkEventGenerator = new WaterMarkEventGenerator<>(manager, watermarkInterval,
                                                                     maxLagMs, getComponentStreams(context));
+        } else {
+            if (stormConf.containsKey(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM)) {
+                throw new IllegalArgumentException("Late tuple stream can be defined only when specifying a timestamp field");
+            }
         }
         // validate
         validate(stormConf, windowLengthCount, windowLengthDuration,
@@ -268,8 +283,12 @@ public class WindowedBoltExecutor implements IRichBolt {
             if (waterMarkEventGenerator.track(input.getSourceGlobalStreamId(), ts)) {
                 windowManager.add(input, ts);
             } else {
+                if (lateTupleStream != null) {
+                    windowedOutputCollector.emit(lateTupleStream, input, new Values(input));
+                } else {
+                    LOG.info("Received a late tuple {} with ts {}. This will not be processed.", input, ts);
+                }
                 windowedOutputCollector.ack(input);
-                LOG.info("Received a late tuple {} with ts {}. This will not processed.", input, ts);
             }
         } else {
             windowManager.add(input);
@@ -284,6 +303,10 @@ public class WindowedBoltExecutor implements IRichBolt {
 
     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        String lateTupleStream = (String) getComponentConfiguration().get(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM);
+        if (lateTupleStream != null) {
+            declarer.declareStream(lateTupleStream, new Fields(LATE_TUPLE_FIELD));
+        }
         bolt.declareOutputFields(declarer);
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/eccda116/storm-core/src/jvm/org/apache/storm/topology/base/BaseStatefulWindowedBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/topology/base/BaseStatefulWindowedBolt.java b/storm-core/src/jvm/org/apache/storm/topology/base/BaseStatefulWindowedBolt.java
index 850904b..0c35b9d 100644
--- a/storm-core/src/jvm/org/apache/storm/topology/base/BaseStatefulWindowedBolt.java
+++ b/storm-core/src/jvm/org/apache/storm/topology/base/BaseStatefulWindowedBolt.java
@@ -103,6 +103,14 @@ public abstract class BaseStatefulWindowedBolt<T extends State> extends BaseWind
         return this;
     }
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public BaseStatefulWindowedBolt<T> withLateTupleStream(String streamName) {
+        super.withLateTupleStream(streamName);
+        return this;
+    }
 
     /**
      * {@inheritDoc}

http://git-wip-us.apache.org/repos/asf/storm/blob/eccda116/storm-core/src/jvm/org/apache/storm/topology/base/BaseWindowedBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/topology/base/BaseWindowedBolt.java b/storm-core/src/jvm/org/apache/storm/topology/base/BaseWindowedBolt.java
index e1725dd..6a0a93c 100644
--- a/storm-core/src/jvm/org/apache/storm/topology/base/BaseWindowedBolt.java
+++ b/storm-core/src/jvm/org/apache/storm/topology/base/BaseWindowedBolt.java
@@ -255,6 +255,20 @@ public abstract class BaseWindowedBolt implements IWindowedBolt {
     }
 
     /**
+     * Specify a stream id on which late tuples are going to be emitted. They are going to be accessible via the
+     * {@link org.apache.storm.topology.WindowedBoltExecutor#LATE_TUPLE_FIELD} field.
+     * It must be defined on a per-component basis, and in conjunction with the
+     * {@link BaseWindowedBolt#withTimestampField}, otherwise {@link IllegalArgumentException} will be thrown.
+     *
+     * @param streamId the name of the stream used to emit late tuples on
+     */
+    public BaseWindowedBolt withLateTupleStream(String streamId) {
+        windowConfiguration.put(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM, streamId);
+        return this;
+    }
+
+
+    /**
      * Specify the maximum time lag of the tuple timestamp in milliseconds. It means that the tuple timestamps
      * cannot be out of order by more than this amount.
      *

http://git-wip-us.apache.org/repos/asf/storm/blob/eccda116/storm-core/test/jvm/org/apache/storm/topology/WindowedBoltExecutorTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/topology/WindowedBoltExecutorTest.java b/storm-core/test/jvm/org/apache/storm/topology/WindowedBoltExecutorTest.java
index 0ba1e24..f6822ce 100644
--- a/storm-core/test/jvm/org/apache/storm/topology/WindowedBoltExecutorTest.java
+++ b/storm-core/test/jvm/org/apache/storm/topology/WindowedBoltExecutorTest.java
@@ -28,18 +28,21 @@ import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.TupleImpl;
 import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Time;
 import org.apache.storm.windowing.TupleWindow;
-import org.apache.storm.windowing.WaterMarkEvent;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 
+import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.*;
 
 /**
@@ -87,6 +90,7 @@ public class WindowedBoltExecutorTest {
 
     private TopologyContext getTopologyContext() {
         TopologyContext context = Mockito.mock(TopologyContext.class);
+
         Map<GlobalStreamId, Grouping> sources = Collections.singletonMap(
                 new GlobalStreamId("s1", "default"),
                 null
@@ -139,4 +143,85 @@ public class WindowedBoltExecutorTest {
         assertArrayEquals(new long[]{618, 626},
                           new long[]{(long) third.get().get(0).getValue(0), (long)third.get().get(1).getValue(0)});
     }
+
+    @Test
+    public void testPrepareLateTupleStreamWithoutTs() throws Exception {
+        Map<String, Object> conf = new HashMap<>();
+        conf.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 100000);
+        conf.put(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS, 20);
+        conf.put(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS, 10);
+        conf.put(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM, "$late");
+        conf.put(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS, 5);
+        conf.put(Config.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS, 10);
+
+        testWindowedBolt = new TestWindowedBolt();
+        executor = new WindowedBoltExecutor(testWindowedBolt);
+        TopologyContext context = getTopologyContext();
+        // emulate the call of withLateTupleStream method
+        Mockito.when(context.getThisStreams()).thenReturn(new HashSet<>(Arrays.asList("default", "$late")));
+        try {
+            executor.prepare(conf, context, getOutputCollector());
+            fail();
+        } catch (IllegalArgumentException e) {
+            assertThat(e.getMessage(), is("Late tuple stream can be defined only when specifying a timestamp field"));
+        }
+    }
+
+
+    @Test
+    public void testPrepareLateTUpleStreamWithoutBuilder() throws Exception {
+        Map<String, Object> conf = new HashMap<>();
+        conf.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 100000);
+        conf.put(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS, 20);
+        conf.put(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS, 10);
+        conf.put(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_FIELD_NAME, "ts");
+        conf.put(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM, "$late");
+        conf.put(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS, 5);
+        conf.put(Config.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS, 10);
+
+        testWindowedBolt = new TestWindowedBolt();
+        executor = new WindowedBoltExecutor(testWindowedBolt);
+        TopologyContext context = getTopologyContext();
+        try {
+            executor.prepare(conf, context, getOutputCollector());
+            fail();
+        } catch (IllegalArgumentException e) {
+            assertThat(e.getMessage(), is("Stream for late tuples must be defined with the builder method withLateTupleStream"));
+        }
+    }
+
+
+    @Test
+    public void testExecuteWithLateTupleStream() throws Exception {
+        testWindowedBolt = new TestWindowedBolt();
+        executor = new WindowedBoltExecutor(testWindowedBolt);
+        TopologyContext context = getTopologyContext();
+        Mockito.when(context.getThisStreams()).thenReturn(new HashSet<>(Arrays.asList("default", "$late")));
+
+        OutputCollector outputCollector = Mockito.mock(OutputCollector.class);
+        Map<String, Object> conf = new HashMap<>();
+        conf.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 100000);
+        conf.put(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS, 20);
+        conf.put(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS, 10);
+        conf.put(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_FIELD_NAME, "ts");
+        conf.put(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM, "$late");
+        conf.put(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS, 5);
+        conf.put(Config.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS, 10);
+        executor.prepare(conf, context, outputCollector);
+
+        long[] timstamps = {603, 605, 607, 618, 626, 636, 600};
+        List<Tuple> tuples = new ArrayList<>(timstamps.length);
+
+        executor.waterMarkEventGenerator.run();
+        for (long ts : timstamps) {
+            Tuple tuple = getTuple("s1", new Fields("ts"), new Values(ts));
+            tuples.add(tuple);
+            executor.execute(tuple);
+            Time.sleep(10);
+        }
+
+        System.out.println(testWindowedBolt.tupleWindows);
+        Tuple tuple = tuples.get(tuples.size() - 1);
+        Mockito.verify(outputCollector).emit("$late", Arrays.asList(tuple), new Values(tuple));
+    }
 }


[2/3] storm git commit: Merge branch 'late-tuple-poc-2' of https://github.com/kosii/storm into STORM-1873

Posted by ar...@apache.org.
Merge branch 'late-tuple-poc-2' of https://github.com/kosii/storm into STORM-1873


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1f0411a9
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1f0411a9
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1f0411a9

Branch: refs/heads/master
Commit: 1f0411a95726cbb8cf15bf3866023a49922f7dd8
Parents: 2c77a20 eccda11
Author: Arun Mahadevan <ai...@hortonworks.com>
Authored: Thu Jun 9 10:37:18 2016 +0530
Committer: Arun Mahadevan <ai...@hortonworks.com>
Committed: Thu Jun 9 10:37:18 2016 +0530

----------------------------------------------------------------------
 docs/Windowing.md                               | 24 +++++-
 .../clj/org/apache/storm/daemon/executor.clj    |  1 +
 storm-core/src/jvm/org/apache/storm/Config.java |  8 ++
 .../storm/topology/WindowedBoltExecutor.java    | 25 +++++-
 .../topology/base/BaseStatefulWindowedBolt.java |  8 ++
 .../storm/topology/base/BaseWindowedBolt.java   | 14 ++++
 .../topology/WindowedBoltExecutorTest.java      | 87 +++++++++++++++++++-
 7 files changed, 161 insertions(+), 6 deletions(-)
----------------------------------------------------------------------



[3/3] storm git commit: Added STORM-1873 to Changelog

Posted by ar...@apache.org.
Added STORM-1873 to Changelog


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/04e4fdea
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/04e4fdea
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/04e4fdea

Branch: refs/heads/master
Commit: 04e4fdea01bf0f9981d2cf86ecaa75940760d2f8
Parents: 1f0411a
Author: Arun Mahadevan <ai...@hortonworks.com>
Authored: Thu Jun 9 10:38:35 2016 +0530
Committer: Arun Mahadevan <ai...@hortonworks.com>
Committed: Thu Jun 9 10:38:35 2016 +0530

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/04e4fdea/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 046eaea..4556dea 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 2.0.0
+ * STORM-1873: Implement alternative behaviour for late tuples
  * STORM-1878: Flux can now handle IStatefulBolts
  * STORM-1864: StormSubmitter should throw respective exceptions and log respective errors forregistered submitter hook invocation
  * STORM-1766: A better algorithm server rack selection for RAS