You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by tu...@apache.org on 2017/01/12 17:42:41 UTC
[1/2] apex-core git commit: APEXCORE-591 - SubscribeRequestTuple has
wrong buffer size when mask is zero
Repository: apex-core
Updated Branches:
refs/heads/master 99d3a9538 -> 7ea7f6073
APEXCORE-591 - SubscribeRequestTuple has wrong buffer size when mask is zero
Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/97b298e9
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/97b298e9
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/97b298e9
Branch: refs/heads/master
Commit: 97b298e934bcc5f452483e5dfaa0c956efe91612
Parents: 3f06ce7
Author: Vlad Rozov <v....@datatorrent.com>
Authored: Fri Dec 16 08:31:05 2016 -0800
Committer: Vlad Rozov <v....@datatorrent.com>
Committed: Fri Dec 16 08:31:05 2016 -0800
----------------------------------------------------------------------
.../bufferserver/packet/DataTuple.java | 24 +------
.../bufferserver/packet/EmptyTuple.java | 32 ---------
.../packet/GenericRequestTuple.java | 37 ++++------
.../bufferserver/packet/PayloadTuple.java | 44 +++---------
.../packet/PublishRequestTuple.java | 5 ++
.../bufferserver/packet/PurgeRequestTuple.java | 5 ++
.../bufferserver/packet/RequestTuple.java | 31 +++------
.../bufferserver/packet/ResetRequestTuple.java | 5 ++
.../bufferserver/packet/ResetWindowTuple.java | 32 ++-------
.../packet/SubscribeRequestTuple.java | 72 +++++++-------------
.../datatorrent/bufferserver/packet/Tuple.java | 70 +++++++++----------
.../bufferserver/packet/WindowIdTuple.java | 37 +++-------
.../packet/SubscribeRequestTupleTest.java | 24 +++++--
13 files changed, 143 insertions(+), 275 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-core/blob/97b298e9/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/DataTuple.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/DataTuple.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/DataTuple.java
index cb1ad5f..aee10a6 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/DataTuple.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/DataTuple.java
@@ -33,33 +33,15 @@ public class DataTuple extends Tuple
}
@Override
- public int getWindowId()
+ public MessageType getType()
{
- throw new UnsupportedOperationException("Not supported yet.");
- }
-
- @Override
- public int getPartition()
- {
- throw new UnsupportedOperationException("Not supported yet.");
+ return MessageType.CODEC_STATE;
}
@Override
public Slice getData()
{
- return new Slice(buffer, offset + 1, length - 1);
- }
-
- @Override
- public int getBaseSeconds()
- {
- throw new UnsupportedOperationException("Not supported yet.");
- }
-
- @Override
- public int getWindowWidth()
- {
- throw new UnsupportedOperationException("Not supported yet.");
+ return new Slice(buffer, offset, limit - offset);
}
public static byte[] getSerializedTuple(byte type, Slice f)
http://git-wip-us.apache.org/repos/asf/apex-core/blob/97b298e9/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/EmptyTuple.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/EmptyTuple.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/EmptyTuple.java
index 3c3f184..831311e 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/EmptyTuple.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/EmptyTuple.java
@@ -18,8 +18,6 @@
*/
package com.datatorrent.bufferserver.packet;
-import com.datatorrent.netlet.util.Slice;
-
/**
* <p>EmptyTuple class.</p>
*
@@ -38,36 +36,6 @@ public class EmptyTuple extends Tuple
return MessageType.NO_MESSAGE;
}
- @Override
- public int getWindowId()
- {
- throw new UnsupportedOperationException("Not supported yet.");
- }
-
- @Override
- public int getPartition()
- {
- throw new UnsupportedOperationException("Not supported yet.");
- }
-
- @Override
- public Slice getData()
- {
- throw new UnsupportedOperationException("Not supported yet.");
- }
-
- @Override
- public int getBaseSeconds()
- {
- throw new UnsupportedOperationException("Not supported yet.");
- }
-
- @Override
- public int getWindowWidth()
- {
- throw new UnsupportedOperationException("Not supported yet.");
- }
-
public static byte[] getSerializedTuple(byte value)
{
return new byte[] {value};
http://git-wip-us.apache.org/repos/asf/apex-core/blob/97b298e9/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/GenericRequestTuple.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/GenericRequestTuple.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/GenericRequestTuple.java
index c94c6d5..7ee28cc 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/GenericRequestTuple.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/GenericRequestTuple.java
@@ -45,58 +45,51 @@ public class GenericRequestTuple extends RequestTuple
}
@Override
+ public MessageType getType()
+ {
+ return null;
+ }
+
+ @Override
public boolean isValid()
{
return valid;
}
@Override
- public void parse()
+ protected void parse()
{
parsed = true;
- int dataOffset = offset + 1;
- int limit = offset + length;
-
try {
/*
* read the version.
*/
- int idlen = readVarInt(dataOffset, limit);
+ int idlen = readVarInt();
if (idlen > 0) {
- while (buffer[dataOffset++] < 0) {
- }
- version = new String(buffer, dataOffset, idlen);
- dataOffset += idlen;
+ version = new String(buffer, offset, idlen);
+ offset += idlen;
} else if (idlen == 0) {
version = EMPTY_STRING;
- dataOffset++;
} else {
return;
}
/*
* read the identifier.
*/
- idlen = readVarInt(dataOffset, limit);
+ idlen = readVarInt();
if (idlen > 0) {
- while (buffer[dataOffset++] < 0) {
- }
- identifier = new String(buffer, dataOffset, idlen);
- dataOffset += idlen;
+ identifier = new String(buffer, offset, idlen);
+ offset += idlen;
} else if (idlen == 0) {
identifier = EMPTY_STRING;
- dataOffset++;
} else {
return;
}
- baseSeconds = readVarInt(dataOffset, limit);
- while (buffer[dataOffset++] < 0) {
- }
+ baseSeconds = readVarInt();
- windowId = readVarInt(dataOffset, limit);
- while (buffer[dataOffset++] < 0) {
- }
+ windowId = readVarInt();
valid = true;
} catch (NumberFormatException nfe) {
http://git-wip-us.apache.org/repos/asf/apex-core/blob/97b298e9/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/PayloadTuple.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/PayloadTuple.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/PayloadTuple.java
index 256fc05..e75a314 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/PayloadTuple.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/PayloadTuple.java
@@ -18,6 +18,8 @@
*/
package com.datatorrent.bufferserver.packet;
+import java.nio.ByteBuffer;
+
import com.datatorrent.netlet.util.Slice;
/**
@@ -41,23 +43,13 @@ public class PayloadTuple extends Tuple
@Override
public int getPartition()
{
- int p = buffer[offset + 1];
- p |= buffer[offset + 2] << 8;
- p |= buffer[offset + 3] << 16;
- p |= buffer[offset + 4] << 24;
- return p;
- }
-
- @Override
- public int getWindowId()
- {
- throw new UnsupportedOperationException("Not supported yet.");
+ return ByteBuffer.wrap(buffer, offset, 4).getInt();
}
@Override
public Slice getData()
{
- return new Slice(buffer, offset + 5, length - 5);
+ return new Slice(buffer, offset + 4, limit - offset - 4);
}
@Override
@@ -66,37 +58,21 @@ public class PayloadTuple extends Tuple
return "PayloadTuple{" + getPartition() + ", " + getData() + '}';
}
- @Override
- public int getBaseSeconds()
- {
- throw new UnsupportedOperationException("Not supported yet.");
- }
-
- @Override
- public int getWindowWidth()
- {
- throw new UnsupportedOperationException("Not supported yet.");
- }
-
public static byte[] getSerializedTuple(int partition, int size)
{
byte[] array = new byte[size + 5];
- array[0] = MessageType.PAYLOAD_VALUE;
- array[1] = (byte)partition;
- array[2] = (byte)(partition >> 8);
- array[3] = (byte)(partition >> 16);
- array[4] = (byte)(partition >> 24);
+ ByteBuffer byteBuffer = ByteBuffer.wrap(array);
+ byteBuffer.put(MessageType.PAYLOAD_VALUE);
+ byteBuffer.putInt(partition);
return array;
}
public static byte[] getSerializedTuple(int partition, Slice f)
{
byte[] array = new byte[5 + f.length];
- array[0] = MessageType.PAYLOAD_VALUE;
- array[1] = (byte)partition;
- array[2] = (byte)(partition >> 8);
- array[3] = (byte)(partition >> 16);
- array[4] = (byte)(partition >> 24);
+ ByteBuffer byteBuffer = ByteBuffer.wrap(array);
+ byteBuffer.put(MessageType.PAYLOAD_VALUE);
+ byteBuffer.putInt(partition);
System.arraycopy(f.buffer, f.offset, array, 5, f.length);
return array;
}
http://git-wip-us.apache.org/repos/asf/apex-core/blob/97b298e9/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/PublishRequestTuple.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/PublishRequestTuple.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/PublishRequestTuple.java
index 6a9ba39..23d233e 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/PublishRequestTuple.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/PublishRequestTuple.java
@@ -30,6 +30,11 @@ public class PublishRequestTuple extends GenericRequestTuple
super(array, offset, len);
}
+ public MessageType getType()
+ {
+ return MessageType.PUBLISHER_REQUEST;
+ }
+
public static byte[] getSerializedRequest(final String version, final String identifier, final long startingWindowId)
{
return GenericRequestTuple.getSerializedRequest(version, identifier, startingWindowId,
http://git-wip-us.apache.org/repos/asf/apex-core/blob/97b298e9/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/PurgeRequestTuple.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/PurgeRequestTuple.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/PurgeRequestTuple.java
index 1db3131..cb65e14 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/PurgeRequestTuple.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/PurgeRequestTuple.java
@@ -30,6 +30,11 @@ public class PurgeRequestTuple extends GenericRequestTuple
super(array, offset, length);
}
+ public MessageType getType()
+ {
+ return MessageType.PURGE_REQUEST;
+ }
+
public static byte[] getSerializedRequest(String version, String id, long windowId)
{
return GenericRequestTuple.getSerializedRequest(version, id, windowId, MessageType.PURGE_REQUEST_VALUE);
http://git-wip-us.apache.org/repos/asf/apex-core/blob/97b298e9/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/RequestTuple.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/RequestTuple.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/RequestTuple.java
index 53505b4..cc830a9 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/RequestTuple.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/RequestTuple.java
@@ -18,7 +18,8 @@
*/
package com.datatorrent.bufferserver.packet;
-import com.datatorrent.netlet.util.Slice;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* <p>Abstract RequestTuple class.</p>
@@ -27,12 +28,18 @@ import com.datatorrent.netlet.util.Slice;
*/
public abstract class RequestTuple extends Tuple
{
+ private static final Logger logger = LoggerFactory.getLogger(RequestTuple.class);
+
protected boolean valid;
protected boolean parsed;
- public RequestTuple(byte[] buffer, int offset, int length)
+ protected RequestTuple(byte[] buffer, int offset, int length)
{
super(buffer, offset, length);
+ parse();
+ if (!isValid()) {
+ logger.error("Invalid Request Tuple of type {} received!", getType());
+ }
}
public boolean isValid()
@@ -40,25 +47,7 @@ public abstract class RequestTuple extends Tuple
return valid;
}
- @Override
- public int getPartition()
- {
- throw new UnsupportedOperationException("Not supported yet.");
- }
-
- @Override
- public Slice getData()
- {
- throw new UnsupportedOperationException("Not supported yet.");
- }
-
- @Override
- public int getWindowWidth()
- {
- throw new UnsupportedOperationException("Not supported yet.");
- }
-
- public abstract void parse();
+ protected abstract void parse();
public abstract String getVersion();
http://git-wip-us.apache.org/repos/asf/apex-core/blob/97b298e9/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/ResetRequestTuple.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/ResetRequestTuple.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/ResetRequestTuple.java
index 17ca585..33796aa 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/ResetRequestTuple.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/ResetRequestTuple.java
@@ -30,6 +30,11 @@ public class ResetRequestTuple extends GenericRequestTuple
super(array, offset, length);
}
+ public MessageType getType()
+ {
+ return MessageType.RESET_REQUEST;
+ }
+
public static byte[] getSerializedRequest(final String version, final String identifier, final long startingWindowId)
{
return GenericRequestTuple.getSerializedRequest(version, identifier, startingWindowId,
http://git-wip-us.apache.org/repos/asf/apex-core/blob/97b298e9/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/ResetWindowTuple.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/ResetWindowTuple.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/ResetWindowTuple.java
index 6045416..c5b1c40 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/ResetWindowTuple.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/ResetWindowTuple.java
@@ -18,8 +18,6 @@
*/
package com.datatorrent.bufferserver.packet;
-
-import com.datatorrent.netlet.util.Slice;
import com.datatorrent.netlet.util.VarInt;
/**
@@ -29,9 +27,14 @@ import com.datatorrent.netlet.util.VarInt;
*/
public class ResetWindowTuple extends Tuple
{
+ private final int baseSeconds;
+ private final int windowWidth;
+
public ResetWindowTuple(byte[] buffer, int offset, int length)
{
super(buffer, offset, length);
+ baseSeconds = readVarInt();
+ windowWidth = readVarInt();
}
@Override
@@ -41,36 +44,15 @@ public class ResetWindowTuple extends Tuple
}
@Override
- public int getWindowId()
- {
- throw new UnsupportedOperationException("Not supported yet.");
- }
-
- @Override
- public int getPartition()
- {
- throw new UnsupportedOperationException("Not supported yet.");
- }
-
- @Override
- public Slice getData()
- {
- throw new UnsupportedOperationException("Not supported yet.");
- }
-
- @Override
public int getBaseSeconds()
{
- return readVarInt(offset + 1, offset + length);
+ return baseSeconds;
}
@Override
public int getWindowWidth()
{
- int intervalOffset = offset + 1;
- while (buffer[intervalOffset++] < 0) {
- }
- return readVarInt(intervalOffset, offset + length);
+ return windowWidth;
}
public static byte[] getSerializedTuple(int baseSeconds, int windowWidth)
http://git-wip-us.apache.org/repos/asf/apex-core/blob/97b298e9/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/SubscribeRequestTuple.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/SubscribeRequestTuple.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/SubscribeRequestTuple.java
index e029015..37efb70 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/SubscribeRequestTuple.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/SubscribeRequestTuple.java
@@ -46,113 +46,91 @@ public class SubscribeRequestTuple extends RequestTuple
private int bufferSize;
@Override
+ public MessageType getType()
+ {
+ return MessageType.SUBSCRIBER_REQUEST;
+ }
+
+ @Override
public void parse()
{
parsed = true;
- int dataOffset = offset + 1;
- int limit = offset + length;
try {
/*
* read the version.
*/
- int idlen = readVarInt(dataOffset, limit);
+ int idlen = readVarInt();
if (idlen > 0) {
- while (buffer[dataOffset++] < 0) {
- }
- version = new String(buffer, dataOffset, idlen);
- dataOffset += idlen;
+ version = new String(buffer, offset, idlen);
+ offset += idlen;
} else if (idlen == 0) {
version = EMPTY_STRING;
- dataOffset++;
} else {
return;
}
/*
* read the identifier.
*/
- idlen = readVarInt(dataOffset, limit);
+ idlen = readVarInt();
if (idlen > 0) {
- while (buffer[dataOffset++] < 0) {
- }
- identifier = new String(buffer, dataOffset, idlen);
- dataOffset += idlen;
+ identifier = new String(buffer, offset, idlen);
+ offset += idlen;
} else if (idlen == 0) {
identifier = EMPTY_STRING;
- dataOffset++;
} else {
return;
}
- baseSeconds = readVarInt(dataOffset, limit);
- while (buffer[dataOffset++] < 0) {
- }
+ baseSeconds = readVarInt();
- windowId = readVarInt(dataOffset, limit);
- while (buffer[dataOffset++] < 0) {
- }
+ windowId = readVarInt();
/*
* read the type
*/
- idlen = readVarInt(dataOffset, limit);
+ idlen = readVarInt();
if (idlen > 0) {
- while (buffer[dataOffset++] < 0) {
- }
- streamType = new String(buffer, dataOffset, idlen);
- dataOffset += idlen;
+ streamType = new String(buffer, offset, idlen);
+ offset += idlen;
} else if (idlen == 0) {
streamType = EMPTY_STRING;
- dataOffset++;
} else {
return;
}
/*
* read the upstream identifier
*/
- idlen = readVarInt(dataOffset, limit);
+ idlen = readVarInt();
if (idlen > 0) {
- while (buffer[dataOffset++] < 0) {
- }
- upstreamIdentifier = new String(buffer, dataOffset, idlen);
- dataOffset += idlen;
+ upstreamIdentifier = new String(buffer, offset, idlen);
+ offset += idlen;
} else if (idlen == 0) {
upstreamIdentifier = EMPTY_STRING;
- dataOffset++;
} else {
return;
}
/*
* read the partition count
*/
- int count = readVarInt(dataOffset, limit);
+ int count = readVarInt();
if (count > 0) {
- while (buffer[dataOffset++] < 0) {
- }
- mask = readVarInt(dataOffset, limit);
- if (mask > 0) {
- while (buffer[dataOffset++] < 0) {
- }
- } else {
+ mask = readVarInt();
+ if (mask <= 0) {
/* mask cannot be zero */
return;
}
partitions = new int[count];
for (int i = 0; i < count; i++) {
- partitions[i] = readVarInt(dataOffset, limit);
+ partitions[i] = readVarInt();
if (partitions[i] == -1) {
return;
- } else {
- while (buffer[dataOffset++] < 0) {
- }
}
}
}
- bufferSize = readVarInt(dataOffset, limit);
+ bufferSize = readVarInt();
if (bufferSize == -1) {
return;
}
- while (buffer[dataOffset++] < 0) {
- }
valid = true;
} catch (NumberFormatException nfe) {
http://git-wip-us.apache.org/repos/asf/apex-core/blob/97b298e9/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/Tuple.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/Tuple.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/Tuple.java
index 408e8a2..a75c7f1 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/Tuple.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/Tuple.java
@@ -35,15 +35,15 @@ public abstract class Tuple
{
public static final String CLASSIC_VERSION = "1.0";
public static final String FAST_VERSION = "1.1";
- public final byte[] buffer;
- public final int offset;
- public final int length;
+ protected final byte[] buffer;
+ protected int offset;
+ protected final int limit;
- public Tuple(byte[] array, int offset, int length)
+ protected Tuple(byte[] array, int offset, int length)
{
this.buffer = array;
- this.offset = offset;
- this.length = length;
+ this.offset = offset + 1;
+ this.limit = offset + length;
}
public static Tuple getTuple(byte[] buffer, int offset, int length)
@@ -77,36 +77,16 @@ public abstract class Tuple
return new WindowIdTuple(buffer, offset, length);
case PUBLISHER_REQUEST:
- PublishRequestTuple prt = new PublishRequestTuple(buffer, offset, length);
- prt.parse();
- if (!prt.isValid()) {
- logger.error("Unparseable Generic Request Tuple of type {} received!", MessageType.valueOf(buffer[offset]));
- }
- return prt;
+ return new PublishRequestTuple(buffer, offset, length);
case PURGE_REQUEST:
- PurgeRequestTuple purgert = new PurgeRequestTuple(buffer, offset, length);
- purgert.parse();
- if (!purgert.isValid()) {
- logger.error("Unparseable Purge Request Tuple of type {} received!", MessageType.valueOf(buffer[offset]));
- }
- return purgert;
+ return new PurgeRequestTuple(buffer, offset, length);
case RESET_REQUEST:
- ResetRequestTuple resetrt = new ResetRequestTuple(buffer, offset, length);
- resetrt.parse();
- if (!resetrt.isValid()) {
- logger.error("Unparseable Reset Request Tuple of type {} received!", MessageType.valueOf(buffer[offset]));
- }
- return resetrt;
+ return new ResetRequestTuple(buffer, offset, length);
case SUBSCRIBER_REQUEST:
- SubscribeRequestTuple srt = new SubscribeRequestTuple(buffer, offset, length);
- srt.parse();
- if (!srt.isValid()) {
- logger.error("Unparseable Subscriber Request Tuple received!");
- }
- return srt;
+ return new SubscribeRequestTuple(buffer, offset, length);
default:
return null;
@@ -120,7 +100,7 @@ public abstract class Tuple
return offset + identifier.getBytes().length;
}
- public int readVarInt(int offset, int limit)
+ protected int readVarInt()
{
if (offset < limit) {
byte tmp = buffer[offset++];
@@ -161,20 +141,32 @@ public abstract class Tuple
+ Arrays.toString(Arrays.copyOfRange(buffer, offset, limit)));
}
- public MessageType getType()
+ public abstract MessageType getType();
+
+ public int getWindowId()
{
- return MessageType.valueOf(buffer[offset]);
+ throw new UnsupportedOperationException("Not supported yet.");
}
- public abstract int getWindowId();
-
- public abstract int getPartition();
+ public int getPartition()
+ {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
- public abstract Slice getData();
+ public Slice getData()
+ {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
- public abstract int getBaseSeconds();
+ public int getBaseSeconds()
+ {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
- public abstract int getWindowWidth();
+ public int getWindowWidth()
+ {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
private static final Logger logger = LoggerFactory.getLogger(Tuple.class);
}
http://git-wip-us.apache.org/repos/asf/apex-core/blob/97b298e9/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/WindowIdTuple.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/WindowIdTuple.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/WindowIdTuple.java
index 014827c..7ac9c6c 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/WindowIdTuple.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/WindowIdTuple.java
@@ -18,8 +18,6 @@
*/
package com.datatorrent.bufferserver.packet;
-
-import com.datatorrent.netlet.util.Slice;
import com.datatorrent.netlet.util.VarInt;
/**
@@ -29,45 +27,26 @@ import com.datatorrent.netlet.util.VarInt;
*/
public class WindowIdTuple extends Tuple
{
+ private final MessageType messageType;
+ private final int windowId;
+
public WindowIdTuple(byte[] array, int offset, int length)
{
super(array, offset, length);
- }
-
- @Override
- public int getWindowId()
- {
- return readVarInt(offset + 1, offset + length);
+ messageType = MessageType.valueOf(array[offset]);
+ windowId = readVarInt();
}
@Override
public MessageType getType()
{
- return MessageType.valueOf(buffer[offset]);
- }
-
- @Override
- public int getPartition()
- {
- throw new UnsupportedOperationException("Not supported yet.");
- }
-
- @Override
- public Slice getData()
- {
- throw new UnsupportedOperationException("Not supported yet.");
+ return messageType;
}
@Override
- public int getBaseSeconds()
- {
- throw new UnsupportedOperationException("Not supported yet.");
- }
-
- @Override
- public int getWindowWidth()
+ public int getWindowId()
{
- throw new UnsupportedOperationException("Not supported yet.");
+ return windowId;
}
@Override
http://git-wip-us.apache.org/repos/asf/apex-core/blob/97b298e9/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/SubscribeRequestTupleTest.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/SubscribeRequestTupleTest.java b/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/SubscribeRequestTupleTest.java
index 20c658a..f7b8829 100644
--- a/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/SubscribeRequestTupleTest.java
+++ b/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/SubscribeRequestTupleTest.java
@@ -23,7 +23,8 @@ import org.testng.annotations.Test;
import static com.datatorrent.bufferserver.packet.SubscribeRequestTuple.getSerializedRequest;
import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
/**
*
@@ -44,17 +45,30 @@ public class SubscribeRequestTupleTest
ArrayList<Integer> partitions = new ArrayList<Integer>();
partitions.add(5);
long startingWindowId = 0xcafebabe00000078L;
- byte[] serial = getSerializedRequest(null, id, down_type, upstream_id, mask, partitions, startingWindowId, 0);
+
+ byte[] serial = getSerializedRequest(null, id, down_type, upstream_id, mask, partitions, startingWindowId, 32 * 1024);
SubscribeRequestTuple tuple = (SubscribeRequestTuple)Tuple.getTuple(serial, 0, serial.length);
+
assertEquals(tuple.getIdentifier(), id, "Identifier");
assertEquals(tuple.getStreamType(), down_type, "UpstreamType");
assertEquals(tuple.getUpstreamIdentifier(), upstream_id, "UpstreamId");
assertEquals(tuple.getMask(), mask, "Mask");
-
+ assertEquals(tuple.getBufferSize(), 32 * 1024, "BufferSize");
int[] parts = tuple.getPartitions();
- assertTrue(parts != null && parts.length == 1 && parts[0] == 5);
+ assertNotNull(parts);
+ assertEquals(parts.length, 1);
+ assertEquals(parts[0], 5);
+ assertEquals((long)tuple.getBaseSeconds() << 32 | tuple.getWindowId(), startingWindowId, "Window");
+
+ serial = getSerializedRequest(null, id, down_type, upstream_id, 0, null, startingWindowId, 32 * 1024);
+ tuple = (SubscribeRequestTuple)Tuple.getTuple(serial, 0, serial.length);
+ assertEquals(tuple.getIdentifier(), id, "Identifier");
+ assertEquals(tuple.getStreamType(), down_type, "UpstreamType");
+ assertEquals(tuple.getUpstreamIdentifier(), upstream_id, "UpstreamId");
+ assertEquals(tuple.getMask(), 0, "Mask");
+ assertEquals(tuple.getBufferSize(), 32 * 1024, "BufferSize");
+ assertNull(tuple.getPartitions());
assertEquals((long)tuple.getBaseSeconds() << 32 | tuple.getWindowId(), startingWindowId, "Window");
}
-
}
[2/2] apex-core git commit: Merge branch 'APEXCORE-591' of
https://github.com/vrozov/apex-core
Posted by tu...@apache.org.
Merge branch 'APEXCORE-591' of https://github.com/vrozov/apex-core
Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/7ea7f607
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/7ea7f607
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/7ea7f607
Branch: refs/heads/master
Commit: 7ea7f6073310ea52fae22ab974d4ce08cdeecf1d
Parents: 99d3a95 97b298e
Author: Tushar R. Gosavi <tu...@apache.org>
Authored: Thu Jan 12 23:10:52 2017 +0530
Committer: Tushar R. Gosavi <tu...@apache.org>
Committed: Thu Jan 12 23:10:52 2017 +0530
----------------------------------------------------------------------
.../bufferserver/packet/DataTuple.java | 24 +------
.../bufferserver/packet/EmptyTuple.java | 32 ---------
.../packet/GenericRequestTuple.java | 37 ++++------
.../bufferserver/packet/PayloadTuple.java | 44 +++---------
.../packet/PublishRequestTuple.java | 5 ++
.../bufferserver/packet/PurgeRequestTuple.java | 5 ++
.../bufferserver/packet/RequestTuple.java | 31 +++------
.../bufferserver/packet/ResetRequestTuple.java | 5 ++
.../bufferserver/packet/ResetWindowTuple.java | 32 ++-------
.../packet/SubscribeRequestTuple.java | 72 +++++++-------------
.../datatorrent/bufferserver/packet/Tuple.java | 70 +++++++++----------
.../bufferserver/packet/WindowIdTuple.java | 37 +++-------
.../packet/SubscribeRequestTupleTest.java | 24 +++++--
13 files changed, 143 insertions(+), 275 deletions(-)
----------------------------------------------------------------------