You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/04/06 17:49:40 UTC
[7/7] flink git commit: [runtime] Remove old redundant classes -
StringRecord -> StringValue - IntegerRecord -> IntValue - FileRecord ->
obsolete
[runtime] Remove old redundant classes
- StringRecord -> StringValue
- IntegerRecord -> IntValue
- FileRecord -> obsolete
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/12c555f5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/12c555f5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/12c555f5
Branch: refs/heads/master
Commit: 12c555f5f31aa6a14d39d2a96bcf7412ec17495f
Parents: adc8530
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Apr 6 16:27:25 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Apr 6 16:27:25 2015 +0200
----------------------------------------------------------------------
.../java/org/apache/flink/core/fs/Path.java | 28 +-
.../flink/core/io/LocatableInputSplit.java | 7 +-
.../org/apache/flink/core/io/StringRecord.java | 701 -------------------
.../java/org/apache/flink/util/StringUtils.java | 17 +-
.../record/io/ExternalProcessInputSplit.java | 7 +-
.../runtime/event/task/StringTaskEvent.java | 26 +-
.../task/IterationSynchronizationSinkTask.java | 10 +-
.../impl/types/ProfilingDataContainer.java | 18 +-
.../apache/flink/runtime/types/FileRecord.java | 128 ----
.../flink/runtime/types/IntegerRecord.java | 100 ---
.../runtime/util/SerializableArrayList.java | 52 +-
.../flink/runtime/util/SerializableHashMap.java | 30 +-
.../flink/runtime/util/SerializableHashSet.java | 45 +-
.../IOManagerPerformanceBenchmark.java | 16 +-
.../io/network/DefaultChannelSelectorTest.java | 7 +-
.../SlotCountExceedingParallelismTest.java | 16 +-
.../ScheduleOrUpdateConsumersTest.java | 16 +-
.../flink/runtime/types/StringRecordTest.java | 111 ---
.../apache/flink/runtime/types/TypeTest.java | 88 ---
.../apache/flink/runtime/jobmanager/Tasks.scala | 63 +-
20 files changed, 140 insertions(+), 1346 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/12c555f5/flink-core/src/main/java/org/apache/flink/core/fs/Path.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/Path.java b/flink-core/src/main/java/org/apache/flink/core/fs/Path.java
index 30a2a65..5a15d6a 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/Path.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/Path.java
@@ -28,7 +28,6 @@ import java.net.URI;
import java.net.URISyntaxException;
import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.core.io.StringRecord;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.util.OperatingSystem;
@@ -473,20 +472,19 @@ public class Path implements IOReadableWritable, Serializable {
final boolean isNotNull = in.readBoolean();
if (isNotNull) {
- final String scheme = StringRecord.readString(in);
- final String userInfo = StringRecord.readString(in);
- final String host = StringRecord.readString(in);
+ final String scheme = StringUtils.readNullableString(in);
+ final String userInfo = StringUtils.readNullableString(in);
+ final String host = StringUtils.readNullableString(in);
final int port = in.readInt();
- final String path = StringRecord.readString(in);
- final String query = StringRecord.readString(in);
- final String fragment = StringRecord.readString(in);
+ final String path = StringUtils.readNullableString(in);
+ final String query = StringUtils.readNullableString(in);
+ final String fragment = StringUtils.readNullableString(in);
try {
uri = new URI(scheme, userInfo, host, port, path, query, fragment);
} catch (URISyntaxException e) {
- throw new IOException("Error reconstructing URI: " + StringUtils.stringifyException(e));
+ throw new IOException("Error reconstructing URI", e);
}
-
}
}
@@ -498,13 +496,13 @@ public class Path implements IOReadableWritable, Serializable {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
- StringRecord.writeString(out, uri.getScheme());
- StringRecord.writeString(out, uri.getUserInfo());
- StringRecord.writeString(out, uri.getHost());
+ StringUtils.writeNullableString(uri.getScheme(), out);
+ StringUtils.writeNullableString(uri.getUserInfo(), out);
+ StringUtils.writeNullableString(uri.getHost(), out);
out.writeInt(uri.getPort());
- StringRecord.writeString(out, uri.getPath());
- StringRecord.writeString(out, uri.getQuery());
- StringRecord.writeString(out, uri.getFragment());
+ StringUtils.writeNullableString(uri.getPath(), out);
+ StringUtils.writeNullableString(uri.getQuery(), out);
+ StringUtils.writeNullableString(uri.getFragment(), out);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/12c555f5/flink-core/src/main/java/org/apache/flink/core/io/LocatableInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/io/LocatableInputSplit.java b/flink-core/src/main/java/org/apache/flink/core/io/LocatableInputSplit.java
index 9bf23a6..a373639 100644
--- a/flink-core/src/main/java/org/apache/flink/core/io/LocatableInputSplit.java
+++ b/flink-core/src/main/java/org/apache/flink/core/io/LocatableInputSplit.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.StringUtils;
/**
* A locatable input split is an input split referring to input data which is located on one or more hosts.
@@ -90,8 +91,8 @@ public class LocatableInputSplit implements InputSplit, java.io.Serializable {
public void write(DataOutputView out) throws IOException {
out.writeInt(this.splitNumber);
out.writeInt(this.hostnames.length);
- for (int i = 0; i < this.hostnames.length; i++) {
- StringRecord.writeString(out, this.hostnames[i]);
+ for (String hostname : this.hostnames) {
+ StringUtils.writeNullableString(hostname, out);
}
}
@@ -105,7 +106,7 @@ public class LocatableInputSplit implements InputSplit, java.io.Serializable {
} else {
this.hostnames = new String[numHosts];
for (int i = 0; i < numHosts; i++) {
- this.hostnames[i] = StringRecord.readString(in);
+ this.hostnames[i] = StringUtils.readNullableString(in);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/12c555f5/flink-core/src/main/java/org/apache/flink/core/io/StringRecord.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/io/StringRecord.java b/flink-core/src/main/java/org/apache/flink/core/io/StringRecord.java
deleted file mode 100644
index d56b484..0000000
--- a/flink-core/src/main/java/org/apache/flink/core/io/StringRecord.java
+++ /dev/null
@@ -1,701 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-/**
- * This file is based on source code from the Hadoop Project (http://hadoop.apache.org/), licensed by the Apache
- * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
- * additional information regarding copyright ownership.
- */
-
-package org.apache.flink.core.io;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.CharBuffer;
-import java.nio.charset.CharacterCodingException;
-import java.nio.charset.Charset;
-import java.nio.charset.CharsetDecoder;
-import java.nio.charset.CharsetEncoder;
-import java.nio.charset.CodingErrorAction;
-import java.nio.charset.MalformedInputException;
-import java.text.CharacterIterator;
-import java.text.StringCharacterIterator;
-import java.util.Arrays;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.types.Value;
-
-/**
- * This class stores text using standard UTF8 encoding. It provides methods to
- * serialize, deserialize, and compare texts at byte level. The type of length
- * is integer and is serialized using zero-compressed format.
- * <p>
- * In addition, it provides methods for string traversal without converting the byte array to a string.
- * <p>
- * Also includes utilities for serializing/deserialing a string, coding/decoding a string, checking if a byte array
- * contains valid UTF8 code, calculating the length of an encoded string.
- */
-public class StringRecord implements Value {
-
- private static final long serialVersionUID = 1L;
-
- private static final ThreadLocal<CharsetEncoder> ENCODER_FACTORY = new ThreadLocal<CharsetEncoder>() {
- protected CharsetEncoder initialValue() {
- return Charset.forName("UTF-8").newEncoder().onMalformedInput(CodingErrorAction.REPORT)
- .onUnmappableCharacter(CodingErrorAction.REPORT);
- }
- };
-
- private static final ThreadLocal<CharsetDecoder> DECODER_FACTORY = new ThreadLocal<CharsetDecoder>() {
- protected CharsetDecoder initialValue() {
- return Charset.forName("UTF-8").newDecoder().onMalformedInput(CodingErrorAction.REPORT)
- .onUnmappableCharacter(CodingErrorAction.REPORT);
- }
- };
-
- /**
- * Cache the hash code for the encapsulated string.
- **/
- private int hash = 0;
-
- private static final byte[] EMPTY_BYTES = new byte[0];
-
- private byte[] bytes;
-
- private int length;
-
- public StringRecord() {
- this.bytes = EMPTY_BYTES;
- this.hash = 0;
- }
-
- /**
- * Construct from a string.
- */
- public StringRecord(final String string) {
- set(string);
- }
-
- /** Construct from another text. */
- public StringRecord(final StringRecord utf8) {
- set(utf8);
- }
-
- /**
- * Construct from a byte array.
- */
- public StringRecord(final byte[] utf8) {
- set(utf8);
- }
-
- /**
- * Returns the raw bytes; however, only data up to {@link #getLength()} is
- * valid.
- */
- public byte[] getBytes() {
- return bytes;
- }
-
- /**
- * Returns the Unicode Scalar Value (32-bit integer value) for the character
- * at <code>position</code>. Note that this method avoids using the
- * converter or doing String instantiation
- *
- * @return the Unicode scalar value at position or -1 if the position is
- * invalid or points to a trailing byte
- */
- public int charAt(final int position) {
- if (position > this.length) {
- return -1; // too long
- }
- if (position < 0) {
- return -1;
- }
-
- final ByteBuffer bb = (ByteBuffer) ByteBuffer.wrap(this.bytes).position(position);
- return bytesToCodePoint(bb.slice());
- }
-
- public int find(final String what) {
- return find(what, 0);
- }
-
- /**
- * Finds any occurrence of <code>what</code> in the backing buffer, starting
- * as position <code>start</code>. The starting position is measured in
- * bytes and the return value is in terms of byte position in the buffer.
- * The backing buffer is not converted to a string for this operation.
- *
- * @return byte position of the first occurence of the search string in the
- * UTF-8 buffer or -1 if not found
- */
- public int find(final String what, final int start) {
- try {
- final ByteBuffer src = ByteBuffer.wrap(this.bytes, 0, this.length);
- final ByteBuffer tgt = encode(what);
- final byte b = tgt.get();
- src.position(start);
-
- while (src.hasRemaining()) {
- if (b == src.get()) { // matching first byte
- src.mark(); // save position in loop
- tgt.mark(); // save position in target
- boolean found = true;
- final int pos = src.position() - 1;
- while (tgt.hasRemaining()) {
- if (!src.hasRemaining()) { // src expired first
- tgt.reset();
- src.reset();
- found = false;
- break;
- }
- if (!(tgt.get() == src.get())) {
- tgt.reset();
- src.reset();
- found = false;
- break; // no match
- }
- }
- if (found) {
- return pos;
- }
- }
- }
- return -1; // not found
- } catch (CharacterCodingException e) {
- // can't get here
- e.printStackTrace();
- return -1;
- }
- }
-
- /**
- * Set to contain the contents of a string.
- */
- public void set(final String string) {
- try {
- final ByteBuffer bb = encode(string, true);
- this.bytes = bb.array();
- this.length = bb.limit();
- this.hash = 0;
- } catch (CharacterCodingException e) {
- throw new RuntimeException("Should not have happened " + e.toString());
- }
- }
-
- /**
- * Set to a utf8 byte array
- */
- public void set(final byte[] utf8) {
- set(utf8, 0, utf8.length);
- }
-
- /** copy a text. */
- public void set(final StringRecord other) {
- set(other.getBytes(), 0, other.getLength());
- }
-
- /** Returns the number of bytes in the byte array */
- public int getLength() {
- return length;
- }
-
- /**
- * Set the Text to range of bytes
- *
- * @param utf8
- * the data to copy from
- * @param start
- * the first position of the new string
- * @param len
- * the number of bytes of the new string
- */
- public void set(final byte[] utf8, final int start, final int len) {
- setCapacity(len, false);
- System.arraycopy(utf8, start, bytes, 0, len);
- this.length = len;
- this.hash = 0;
- }
-
- /**
- * Append a range of bytes to the end of the given text
- *
- * @param utf8
- * the data to copy from
- * @param start
- * the first position to append from utf8
- * @param len
- * the number of bytes to append
- */
- public void append(final byte[] utf8, final int start, final int len) {
- setCapacity(length + len, true);
- System.arraycopy(utf8, start, bytes, length, len);
- this.length += len;
- this.hash = 0;
- }
-
- /**
- * Clear the string to empty.
- */
- public void clear() {
- this.length = 0;
- this.hash = 0;
- }
-
- /*
- * Sets the capacity of this Text object to <em>at least</em>
- * <code>len</code> bytes. If the current buffer is longer, then the
- * capacity and existing content of the buffer are unchanged. If
- * <code>len</code> is larger than the current capacity, the Text object's
- * capacity is increased to match.
- * @param len the number of bytes we need
- * @param keepData should the old data be kept
- */
- private void setCapacity(final int len, final boolean keepData) {
- if (this.bytes == null || this.bytes.length < len) {
- final byte[] newBytes = new byte[len];
- if (this.bytes != null && keepData) {
- System.arraycopy(this.bytes, 0, newBytes, 0, this.length);
- }
- this.bytes = newBytes;
- }
- }
-
- /**
- * Convert text back to string
- *
- * @see java.lang.Object#toString()
- */
- public String toString() {
- try {
- return decode(bytes, 0, length);
- } catch (CharacterCodingException e) {
- throw new RuntimeException("Should not have happened " + e.toString());
- }
- }
-
- /**
- * deserialize
- * @param in
- */
- public void read(final DataInputView in) throws IOException {
- final int newLength = in.readInt();
- setCapacity(newLength, false);
- in.readFully(this.bytes, 0, newLength);
- this.length = newLength;
- this.hash = 0;
- }
-
- /** Skips over one Text in the input. */
- public static void skip(final DataInput in) throws IOException {
- final int length = in.readInt();
- skipFully(in, length);
- }
-
- public static void skipFully(final DataInput in, final int len) throws IOException {
- int total = 0;
- int cur = 0;
-
- while ((total < len) && ((cur = in.skipBytes(len - total)) > 0)) {
- total += cur;
- }
-
- if (total < len) {
- throw new IOException("Not able to skip " + len + " bytes, possibly " + "due to end of input.");
- }
- }
-
- @Override
- public void write(final DataOutputView out) throws IOException {
- out.writeInt(this.length);
- out.write(this.bytes, 0, this.length);
- }
-
-
- @Override
- public boolean equals(final Object obj) {
-
- if (!(obj instanceof StringRecord)) {
- return false;
- }
-
- final StringRecord sr = (StringRecord) obj;
- if (this.length != sr.getLength()) {
- return false;
- }
-
- if (this.bytes.length == this.length && sr.bytes.length == sr.length) {
-
- return Arrays.equals(this.bytes, sr.bytes);
-
- } else {
-
- for (int i = 0; i < this.length; ++i) {
- if (this.bytes[i] != sr.bytes[i]) {
- return false;
- }
- }
- }
-
- return true;
- }
-
-
- @Override
- public int hashCode() {
-
- int h = this.hash;
- if (h == 0 && this.length > 0) {
- int off = 0;
- byte[] val = this.bytes;
- int len = this.length;
-
- for (int i = 0; i < len; i++) {
- h = 31 * h + val[off++];
- }
- this.hash = h;
- }
- return h;
-
- }
-
- // / STATIC UTILITIES FROM HERE DOWN
- /**
- * Converts the provided byte array to a String using the UTF-8 encoding. If
- * the input is malformed, replace by a default value.
- */
- public static String decode(final byte[] utf8) throws CharacterCodingException {
- return decode(ByteBuffer.wrap(utf8), true);
- }
-
- public static String decode(final byte[] utf8, final int start, final int length) throws CharacterCodingException {
- return decode(ByteBuffer.wrap(utf8, start, length), true);
- }
-
- /**
- * Converts the provided byte array to a String using the UTF-8 encoding. If <code>replace</code> is true, then
- * malformed input is replaced with the
- * substitution character, which is U+FFFD. Otherwise the method throws a
- * MalformedInputException.
- */
- public static String decode(final byte[] utf8, final int start, final int length, final boolean replace)
- throws CharacterCodingException {
- return decode(ByteBuffer.wrap(utf8, start, length), replace);
- }
-
- private static String decode(final ByteBuffer utf8, final boolean replace) throws CharacterCodingException {
- final CharsetDecoder decoder = DECODER_FACTORY.get();
- if (replace) {
- decoder.onMalformedInput(java.nio.charset.CodingErrorAction.REPLACE);
- decoder.onUnmappableCharacter(CodingErrorAction.REPLACE);
- }
- final String str = decoder.decode(utf8).toString();
- // set decoder back to its default value: REPORT
- if (replace) {
- decoder.onMalformedInput(CodingErrorAction.REPORT);
- decoder.onUnmappableCharacter(CodingErrorAction.REPORT);
- }
- return str;
- }
-
- /**
- * Converts the provided String to bytes using the UTF-8 encoding. If the
- * input is malformed, invalid chars are replaced by a default value.
- *
- * @return ByteBuffer: bytes stores at ByteBuffer.array() and length is
- * ByteBuffer.limit()
- */
-
- public static ByteBuffer encode(final String string) throws CharacterCodingException {
- return encode(string, true);
- }
-
- /**
- * Converts the provided String to bytes using the UTF-8 encoding. If <code>replace</code> is true, then malformed
- * input is replaced with the
- * substitution character, which is U+FFFD. Otherwise the method throws a
- * MalformedInputException.
- *
- * @return ByteBuffer: bytes stores at ByteBuffer.array() and length is
- * ByteBuffer.limit()
- */
- public static ByteBuffer encode(final String string, final boolean replace) throws CharacterCodingException {
-
- final CharsetEncoder encoder = ENCODER_FACTORY.get();
- if (replace) {
- encoder.onMalformedInput(CodingErrorAction.REPLACE);
- encoder.onUnmappableCharacter(CodingErrorAction.REPLACE);
- }
- ByteBuffer bytes = encoder.encode(CharBuffer.wrap(string.toCharArray()));
- if (replace) {
- encoder.onMalformedInput(CodingErrorAction.REPORT);
- encoder.onUnmappableCharacter(CodingErrorAction.REPORT);
- }
- return bytes;
- }
-
- /**
- * Read a UTF8 encoded string from in
- */
- public static String readString(final DataInput in) throws IOException {
-
- if (in.readBoolean()) {
- final int length = in.readInt();
- if (length < 0) {
- throw new IOException("length of StringRecord is " + length);
- }
-
- final byte[] bytes = new byte[length];
- in.readFully(bytes, 0, length);
- return decode(bytes);
- }
-
- return null;
- }
-
- /**
- * Write a UTF8 encoded string to out
- */
- public static int writeString(final DataOutput out, final String s) throws IOException {
-
- int length = 0;
-
- if (s == null) {
- out.writeBoolean(false);
- } else {
- out.writeBoolean(true);
- final ByteBuffer bytes = encode(s);
- length = bytes.limit();
- out.writeInt(length);
- out.write(bytes.array(), 0, length);
- }
- return length;
- }
-
- // //// states for validateUTF8
-
- private static final int LEAD_BYTE = 0;
-
- private static final int TRAIL_BYTE_1 = 1;
-
- private static final int TRAIL_BYTE = 2;
-
- /**
- * Check if a byte array contains valid utf-8
- *
- * @param utf8
- * byte array
- * @throws MalformedInputException
- * if the byte array contains invalid utf-8
- */
- public static void validateUTF8(final byte[] utf8) throws MalformedInputException {
- validateUTF8(utf8, 0, utf8.length);
- }
-
- /**
- * Check to see if a byte array is valid utf-8
- *
- * @param utf8
- * the array of bytes
- * @param start
- * the offset of the first byte in the array
- * @param len
- * the length of the byte sequence
- * @throws MalformedInputException
- * if the byte array contains invalid bytes
- */
- public static void validateUTF8(final byte[] utf8, final int start, final int len) throws MalformedInputException {
- int count = start;
- int leadByte = 0;
- int length = 0;
- int state = LEAD_BYTE;
- while (count < start + len) {
- final int aByte = ((int) utf8[count] & 0xFF);
-
- switch (state) {
- case LEAD_BYTE:
- leadByte = aByte;
- length = bytesFromUTF8[aByte];
-
- switch (length) {
- case 0: // check for ASCII
- if (leadByte > 0x7F) {
- throw new MalformedInputException(count);
- }
- break;
- case 1:
- if (leadByte < 0xC2 || leadByte > 0xDF) {
- throw new MalformedInputException(count);
- }
- state = TRAIL_BYTE_1;
- break;
- case 2:
- if (leadByte < 0xE0 || leadByte > 0xEF) {
- throw new MalformedInputException(count);
- }
- state = TRAIL_BYTE_1;
- break;
- case 3:
- if (leadByte < 0xF0 || leadByte > 0xF4) {
- throw new MalformedInputException(count);
- }
- state = TRAIL_BYTE_1;
- break;
- default:
- // too long! Longest valid UTF-8 is 4 bytes (lead + three)
- // or if < 0 we got a trail byte in the lead byte position
- throw new MalformedInputException(count);
- } // switch (length)
- break;
-
- case TRAIL_BYTE_1:
- if (leadByte == 0xF0 && aByte < 0x90) {
- throw new MalformedInputException(count);
- }
- if (leadByte == 0xF4 && aByte > 0x8F) {
- throw new MalformedInputException(count);
- }
- if (leadByte == 0xE0 && aByte < 0xA0) {
- throw new MalformedInputException(count);
- }
- if (leadByte == 0xED && aByte > 0x9F) {
- throw new MalformedInputException(count);
- }
- // falls through to regular trail-byte test!!
- case TRAIL_BYTE:
- if (aByte < 0x80 || aByte > 0xBF) {
- throw new MalformedInputException(count);
- }
- if (--length == 0) {
- state = LEAD_BYTE;
- } else {
- state = TRAIL_BYTE;
- }
- break;
- default:
- break;
- } // switch (state)
- count++;
- }
- }
-
- /**
- * Magic numbers for UTF-8. These are the number of bytes that <em>follow</em> a given lead byte. Trailing bytes
- * have the value -1. The
- * values 4 and 5 are presented in this table, even though valid UTF-8
- * cannot include the five and six byte sequences.
- */
- static final int[] bytesFromUTF8 = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
- 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
- 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
- 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
- 0,
- 0,
- 0,
- // trail bytes
- -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
- -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
- -1, -1, -1, -1, -1, -1, -1, -1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
- 1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, 4, 4, 4, 4, 5, 5, 5,
- 5 };
-
- /**
- * Returns the next code point at the current position in the buffer. The
- * buffer's position will be incremented. Any mark set on this buffer will
- * be changed by this method!
- */
- public static int bytesToCodePoint(final ByteBuffer bytes) {
- bytes.mark();
- final byte b = bytes.get();
- bytes.reset();
- final int extraBytesToRead = bytesFromUTF8[(b & 0xFF)];
- if (extraBytesToRead < 0) {
- return -1; // trailing byte!
- }
- int ch = 0;
-
- switch (extraBytesToRead) {
- case 5:
- ch += (bytes.get() & 0xFF);
- ch <<= 6; /* remember, illegal UTF-8 */
- case 4:
- ch += (bytes.get() & 0xFF);
- ch <<= 6; /* remember, illegal UTF-8 */
- case 3:
- ch += (bytes.get() & 0xFF);
- ch <<= 6;
- case 2:
- ch += (bytes.get() & 0xFF);
- ch <<= 6;
- case 1:
- ch += (bytes.get() & 0xFF);
- ch <<= 6;
- case 0:
- ch += (bytes.get() & 0xFF);
- default:
- break;
-
- }
- ch -= offsetsFromUTF8[extraBytesToRead];
-
- return ch;
- }
-
- static final int[] offsetsFromUTF8 = { 0x00000000, 0x00003080, 0x000E2080, 0x03C82080, 0xFA082080, 0x82082080 };
-
- /**
- * For the given string, returns the number of UTF-8 bytes required to
- * encode the string.
- *
- * @param string
- * text to encode
- * @return number of UTF-8 bytes required to encode
- */
- public static int utf8Length(final String string) {
- final CharacterIterator iter = new StringCharacterIterator(string);
- char ch = iter.first();
- int size = 0;
- while (ch != CharacterIterator.DONE) {
- if ((ch >= 0xD800) && (ch < 0xDC00)) {
- // surrogate pair?
- char trail = iter.next();
- if ((trail > 0xDBFF) && (trail < 0xE000)) {
- // valid pair
- size += 4;
- } else {
- // invalid pair
- size += 3;
- iter.previous(); // rewind one
- }
- } else if (ch < 0x80) {
- size++;
- } else if (ch < 0x800) {
- size += 2;
- } else {
- // ch < 0x10000, that is, the largest char value
- size += 3;
- }
- ch = iter.next();
- }
- return size;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/12c555f5/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/StringUtils.java b/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
index 0c4194a..9c7bd8b 100644
--- a/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
@@ -134,19 +134,16 @@ public final class StringUtils {
sb.append("<");
} else if (c == '&') {
sb.append("&");
+ } else if (c < ' ') {
+ // Unreadable throw away
} else {
- if (c < ' ') {
- // Unreadable throw away
- } else {
- sb.append(c);
- }
+ sb.append(c);
}
}
return sb.toString();
}
-
-
+
/**
* This method calls {@link Object#toString()} on the given object, unless the
* object is an array. In that case, it will use the {@link #arrayToString(Object)}
@@ -156,7 +153,7 @@ public final class StringUtils {
* @param o The object for which to create the string representation.
* @return The string representation of the object.
*/
- public static final String arrayAwareToString(Object o) {
+ public static String arrayAwareToString(Object o) {
if (o == null) {
return "null";
}
@@ -175,7 +172,7 @@ public final class StringUtils {
* @return The string representation of the array.
* @throws IllegalArgumentException If the given object is no array.
*/
- public static final String arrayToString(Object array) {
+ public static String arrayToString(Object array) {
if (array == null) {
throw new NullPointerException();
}
@@ -224,7 +221,7 @@ public final class StringUtils {
* @param str The string in which to replace the control characters.
* @return The string with the replaced characters.
*/
- public static final String showControlCharacters(String str) {
+ public static String showControlCharacters(String str) {
int len = str.length();
StringBuilder sb = new StringBuilder();
http://git-wip-us.apache.org/repos/asf/flink/blob/12c555f5/flink-java/src/main/java/org/apache/flink/api/java/record/io/ExternalProcessInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/io/ExternalProcessInputSplit.java b/flink-java/src/main/java/org/apache/flink/api/java/record/io/ExternalProcessInputSplit.java
index 7b5ac96..9f0b345 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/io/ExternalProcessInputSplit.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/record/io/ExternalProcessInputSplit.java
@@ -16,15 +16,14 @@
* limitations under the License.
*/
-
package org.apache.flink.api.java.record.io;
import java.io.IOException;
import org.apache.flink.core.io.GenericInputSplit;
-import org.apache.flink.core.io.StringRecord;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.StringUtils;
/**
* The ExternalProcessInputSplit contains all informations for {@link org.apache.flink.api.common.io.InputFormat} that read their data from external processes.
@@ -68,12 +67,12 @@ public class ExternalProcessInputSplit extends GenericInputSplit {
@Override
public void read(DataInputView in) throws IOException {
super.read(in);
- this.extProcessCommand = StringRecord.readString(in);
+ this.extProcessCommand = StringUtils.readNullableString(in);
}
@Override
public void write(DataOutputView out) throws IOException {
super.write(out);
- StringRecord.writeString(out, this.extProcessCommand);
+ StringUtils.writeNullableString(this.extProcessCommand, out);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/12c555f5/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/StringTaskEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/StringTaskEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/StringTaskEvent.java
index 7b6b3d5..87f2e91 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/StringTaskEvent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/StringTaskEvent.java
@@ -20,26 +20,24 @@ package org.apache.flink.runtime.event.task;
import java.io.IOException;
-import org.apache.flink.core.io.StringRecord;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.StringUtils;
/**
* This class provides a simple implementation of an event that holds a string value.
- *
*/
public class StringTaskEvent extends TaskEvent {
/**
* The string encapsulated by this event.
*/
- private String message = null;
+ private String message;
/**
* The default constructor implementation. It should only be used for deserialization.
*/
- public StringTaskEvent() {
- }
+ public StringTaskEvent() {}
/**
* Constructs a new string task event with the given string message.
@@ -62,22 +60,18 @@ public class StringTaskEvent extends TaskEvent {
@Override
- public void write(final DataOutputView out) throws IOException {
-
- StringRecord.writeString(out, this.message);
+ public void write(DataOutputView out) throws IOException {
+ StringUtils.writeNullableString(this.message, out);
}
-
@Override
public void read(final DataInputView in) throws IOException {
-
- this.message = StringRecord.readString(in);
+ this.message = StringUtils.readNullableString(in);
}
@Override
public int hashCode() {
-
if (this.message == null) {
return 0;
}
@@ -88,19 +82,13 @@ public class StringTaskEvent extends TaskEvent {
@Override
public boolean equals(final Object obj) {
-
if (!(obj instanceof StringTaskEvent)) {
return false;
}
final StringTaskEvent ste = (StringTaskEvent) obj;
-
if (this.message == null) {
- if (ste.getString() == null) {
- return true;
- }
-
- return false;
+ return ste.getString() == null;
}
return this.message.equals(ste.getString());
http://git-wip-us.apache.org/repos/asf/flink/blob/12c555f5/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
index b222cb5..5eccd7b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
@@ -25,6 +25,7 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.types.IntValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.api.common.aggregators.Aggregator;
@@ -37,7 +38,6 @@ import org.apache.flink.runtime.iterative.event.WorkerDoneEvent;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.operators.RegularPactTask;
import org.apache.flink.runtime.operators.util.TaskConfig;
-import org.apache.flink.runtime.types.IntegerRecord;
import org.apache.flink.types.Value;
import com.google.common.base.Preconditions;
@@ -52,7 +52,7 @@ public class IterationSynchronizationSinkTask extends AbstractInvokable implemen
private static final Logger log = LoggerFactory.getLogger(IterationSynchronizationSinkTask.class);
- private MutableRecordReader<IntegerRecord> headEventReader;
+ private MutableRecordReader<IntValue> headEventReader;
private SyncEventHandler eventHandler;
@@ -73,7 +73,7 @@ public class IterationSynchronizationSinkTask extends AbstractInvokable implemen
@Override
public void registerInputOutput() {
- this.headEventReader = new MutableRecordReader<IntegerRecord>(getEnvironment().getInputGate(0));
+ this.headEventReader = new MutableRecordReader<IntValue>(getEnvironment().getInputGate(0));
}
@Override
@@ -101,7 +101,7 @@ public class IterationSynchronizationSinkTask extends AbstractInvokable implemen
getEnvironment().getUserClassLoader());
headEventReader.registerTaskEventListener(eventHandler, WorkerDoneEvent.class);
- IntegerRecord dummy = new IntegerRecord();
+ IntValue dummy = new IntValue();
while (!terminationRequested()) {
@@ -182,7 +182,7 @@ public class IterationSynchronizationSinkTask extends AbstractInvokable implemen
return false;
}
- private void readHeadEventChannel(IntegerRecord rec) throws IOException {
+ private void readHeadEventChannel(IntValue rec) throws IOException {
// reset the handler
eventHandler.resetEndOfSuperstep();
http://git-wip-us.apache.org/repos/asf/flink/blob/12c555f5/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/types/ProfilingDataContainer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/types/ProfilingDataContainer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/types/ProfilingDataContainer.java
index 60334db..b233b84 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/types/ProfilingDataContainer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/types/ProfilingDataContainer.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.profiling.impl.types;
import java.io.IOException;
@@ -25,7 +24,6 @@ import java.util.Iterator;
import java.util.Queue;
import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.core.io.StringRecord;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.util.StringUtils;
@@ -53,20 +51,20 @@ public class ProfilingDataContainer implements IOReadableWritable {
final int numberOfRecords = in.readInt();
for (int i = 0; i < numberOfRecords; i++) {
- final String className = StringRecord.readString(in);
- Class<? extends InternalProfilingData> clazz = null;
+ final String className = StringUtils.readNullableString(in);
+ Class<? extends InternalProfilingData> clazz;
try {
clazz = (Class<? extends InternalProfilingData>) Class.forName(className);
} catch (Exception e) {
- throw new IOException(StringUtils.stringifyException(e));
+ throw new IOException(e);
}
- InternalProfilingData profilingData = null;
+ InternalProfilingData profilingData ;
try {
profilingData = clazz.newInstance();
} catch (Exception e) {
- throw new IOException(StringUtils.stringifyException(e));
+ throw new IOException(e);
}
// Restore internal state
@@ -94,10 +92,8 @@ public class ProfilingDataContainer implements IOReadableWritable {
// Write the number of records
out.writeInt(this.queuedProfilingData.size());
// Write the records themselves
- final Iterator<InternalProfilingData> iterator = this.queuedProfilingData.iterator();
- while (iterator.hasNext()) {
- final InternalProfilingData profilingData = iterator.next();
- StringRecord.writeString(out, profilingData.getClass().getName());
+ for (InternalProfilingData profilingData : this.queuedProfilingData) {
+ StringUtils.writeNullableString(profilingData.getClass().getName(), out);
profilingData.write(out);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/12c555f5/flink-runtime/src/main/java/org/apache/flink/runtime/types/FileRecord.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/types/FileRecord.java b/flink-runtime/src/main/java/org/apache/flink/runtime/types/FileRecord.java
deleted file mode 100644
index 473596a..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/types/FileRecord.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.types;
-
-import java.io.IOException;
-import java.util.Arrays;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.core.io.StringRecord;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-public class FileRecord implements IOReadableWritable {
-
- private String fileName;
-
- private static final byte[] EMPTY_BYTES = new byte[0];
-
- private byte[] bytes;
-
- public FileRecord() {
- this.bytes = EMPTY_BYTES;
- fileName = "empty";
- }
-
- public FileRecord(final String fileName) {
- this.bytes = EMPTY_BYTES;
- this.fileName = fileName;
- }
-
- public void setFileName(final String fileName) {
- this.fileName = fileName;
- }
-
- public String getFileName() {
- return this.fileName;
- }
-
- public byte[] getDataBuffer() {
- return this.bytes;
- }
-
- /**
- * Append a range of bytes to the end of the given data.
- *
- * @param data
- * the data to copy from
- * @param start
- * the first position to append from data
- * @param len
- * the number of bytes to append
- */
- public void append(final byte[] data, final int start, final int len) {
- final int oldLength = this.bytes.length;
- setCapacity(this.bytes.length + len, true);
- System.arraycopy(data, start, this.bytes, oldLength, len);
- }
-
- private void setCapacity(final int len, final boolean keepData) {
-
- if (this.bytes == null || this.bytes.length < len) {
- final byte[] newBytes = new byte[len];
- if (this.bytes != null && keepData) {
- System.arraycopy(this.bytes, 0, newBytes, 0, this.bytes.length);
- }
- this.bytes = newBytes;
- }
- }
-
-
- @Override
- public void read(final DataInputView in) throws IOException {
- this.fileName = StringRecord.readString(in);
-
- final int newLength = in.readInt();
- this.bytes = new byte[newLength];
- in.readFully(this.bytes, 0, newLength);
- }
-
-
- @Override
- public void write(final DataOutputView out) throws IOException {
- StringRecord.writeString(out, fileName);
- out.writeInt(this.bytes.length);
- out.write(this.bytes, 0, this.bytes.length);
- }
-
-
- @Override
- public boolean equals(final Object obj) {
-
- if (!(obj instanceof FileRecord)) {
- return false;
- }
-
- final FileRecord fr = (FileRecord) obj;
-
- if (this.bytes.length != fr.bytes.length) {
- return false;
- }
-
- return Arrays.equals(this.bytes, fr.bytes);
- }
-
-
- @Override
- public int hashCode() {
-
- return (int) ((11L * this.bytes.length) % Integer.MAX_VALUE);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/12c555f5/flink-runtime/src/main/java/org/apache/flink/runtime/types/IntegerRecord.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/types/IntegerRecord.java b/flink-runtime/src/main/java/org/apache/flink/runtime/types/IntegerRecord.java
deleted file mode 100644
index cf35e29..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/types/IntegerRecord.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.types;
-
-import java.io.IOException;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-/**
- * This class represents record for integer values.
- */
-public class IntegerRecord implements IOReadableWritable {
-
- /**
- * The integer value represented by the record.
- */
- private int value = 0;
-
- /**
- * Constructs a new integer record with the given integer value.
- *
- * @param value
- * the integer value this record should wrap up
- */
- public IntegerRecord(final int value) {
- this.value = value;
- }
-
- /**
- * Constructs an empty integer record (Mainly used for
- * serialization, do not call this constructor in your program).
- */
- public IntegerRecord() {}
-
- /**
- * Returns the value of this integer record.
- *
- * @return the value of this integer record
- */
- public int getValue() {
- return this.value;
- }
-
- /**
- * Set the value of this integer record.
- *
- * @param value
- * the new value for this integer record
- */
- public void setValue(final int value) {
- this.value = value;
- }
-
- @Override
- public void read(final DataInputView in) throws IOException {
- // Simply read the value from the stream
- this.value = in.readInt();
- }
-
- @Override
- public void write(final DataOutputView out) throws IOException {
- // Simply write the value to the stream
- out.writeInt(this.value);
- }
-
- @Override
- public boolean equals(final Object obj) {
- if (!(obj instanceof IntegerRecord)) {
- return false;
- }
-
- final IntegerRecord ir = (IntegerRecord) obj;
- return (this.value == ir.value);
- }
-
-
- @Override
- public int hashCode() {
- return this.value;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/12c555f5/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializableArrayList.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializableArrayList.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializableArrayList.java
index e28fa17..8793691 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializableArrayList.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializableArrayList.java
@@ -16,34 +16,25 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.util;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Iterator;
import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.core.io.StringRecord;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.util.StringUtils;
/**
* This class extends a standard {@link java.util.ArrayList} by implementing the
- * {@link org.apache.flink.core.io.IOReadableWritable} interface. As a result, array lists of this type can be used
- * with Nephele's RPC system.
- * <p>
- * This class is not thread-safe.
- *
+ * {@link org.apache.flink.core.io.IOReadableWritable} interface.
+ *
* @param <E>
* the type of object stored inside this array list
*/
public class SerializableArrayList<E extends IOReadableWritable> extends ArrayList<E> implements IOReadableWritable {
- /**
- * Generated serial version UID.
- */
private static final long serialVersionUID = 8196856588290198537L;
/**
@@ -63,44 +54,38 @@ public class SerializableArrayList<E extends IOReadableWritable> extends ArrayLi
super(initialCapacity);
}
-
@Override
public void write(final DataOutputView out) throws IOException {
out.writeInt(size());
- final Iterator<E> it = iterator();
- while (it.hasNext()) {
-
- final E element = it.next();
+ for (E element : this) {
// Write out type
- StringRecord.writeString(out, element.getClass().getName());
+ StringUtils.writeNullableString(element.getClass().getName(), out);
// Write out element itself
element.write(out);
}
}
-
- @SuppressWarnings("unchecked")
@Override
public void read(final DataInputView in) throws IOException {
-
// Make sure the list is empty
clear();
+
final int numberOfElements = in.readInt();
for (int i = 0; i < numberOfElements; i++) {
- final String elementType = StringRecord.readString(in);
- Class<E> clazz = null;
- try {
- clazz = (Class<E>) Class.forName(elementType);
- } catch (ClassNotFoundException e) {
- throw new IOException(StringUtils.stringifyException(e));
- }
+ final String elementType = StringUtils.readNullableString(in);
- E element = null;
+ E element;
try {
+ @SuppressWarnings("unchecked")
+ Class<E> clazz = (Class<E>) Class.forName(elementType);
element = clazz.newInstance();
- } catch (Exception e) {
- throw new IOException(StringUtils.stringifyException(e));
+ }
+ catch (ClassNotFoundException e) {
+ throw new IOException(e);
+ }
+ catch (Exception e) {
+ throw new IOException(e);
}
element.read(in);
@@ -108,13 +93,8 @@ public class SerializableArrayList<E extends IOReadableWritable> extends ArrayLi
}
}
-
@Override
public boolean equals(Object obj) {
- if (!(obj instanceof SerializableArrayList<?>)) {
- return false;
- }
-
- return (obj instanceof SerializableArrayList) && super.equals(obj);
+ return obj instanceof SerializableArrayList<?> && super.equals(obj);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/12c555f5/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializableHashMap.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializableHashMap.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializableHashMap.java
index b2121a7..2744fe8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializableHashMap.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializableHashMap.java
@@ -16,26 +16,20 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.util;
import java.io.IOException;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.Map;
import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.core.io.StringRecord;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.util.StringUtils;
/**
* This class extends a standard {@link java.util.HashMap} by implementing the
- * {@link org.apache.flink.core.io.IOReadableWritable} interface. As a result, hash maps of this type can be used
- * with Nephele's RPC system.
- * <p>
- * This class is not thread-safe.
+ * {@link org.apache.flink.core.io.IOReadableWritable} interface.
*
* @param <K>
* the type of the key used in this hash map
@@ -56,16 +50,14 @@ public class SerializableHashMap<K extends IOReadableWritable, V extends IOReada
out.writeInt(size());
- final Iterator<Map.Entry<K, V>> it = entrySet().iterator();
-
- while (it.hasNext()) {
-
- final Map.Entry<K, V> entry = it.next();
+ for (Map.Entry<K, V> entry : entrySet()) {
final K key = entry.getKey();
final V value = entry.getValue();
- StringRecord.writeString(out, key.getClass().getName());
+
+ StringUtils.writeNullableString(key.getClass().getName(), out);
key.write(out);
- StringRecord.writeString(out, value.getClass().getName());
+
+ StringUtils.writeNullableString(value.getClass().getName(), out);
value.write(out);
}
}
@@ -79,8 +71,8 @@ public class SerializableHashMap<K extends IOReadableWritable, V extends IOReada
for (int i = 0; i < numberOfMapEntries; i++) {
- final String keyType = StringRecord.readString(in);
- Class<K> keyClass = null;
+ final String keyType = StringUtils.readNullableString(in);
+ Class<K> keyClass;
try {
keyClass = (Class<K>) Class.forName(keyType);
} catch (ClassNotFoundException e) {
@@ -96,15 +88,15 @@ public class SerializableHashMap<K extends IOReadableWritable, V extends IOReada
key.read(in);
- final String valueType = StringRecord.readString(in);
- Class<V> valueClass = null;
+ final String valueType = StringUtils.readNullableString(in);
+ Class<V> valueClass;
try {
valueClass = (Class<V>) Class.forName(valueType);
} catch (ClassNotFoundException e) {
throw new IOException(StringUtils.stringifyException(e));
}
- V value = null;
+ V value;
try {
value = valueClass.newInstance();
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/flink/blob/12c555f5/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializableHashSet.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializableHashSet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializableHashSet.java
index 502f000..fb7b06f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializableHashSet.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializableHashSet.java
@@ -16,25 +16,19 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.util;
import java.io.IOException;
import java.util.HashSet;
-import java.util.Iterator;
import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.core.io.StringRecord;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.util.StringUtils;
/**
* This class extends a standard {@link java.util.HashSet} by implementing the
- * {@link org.apache.flink.core.io.IOReadableWritable} interface. As a result, hash sets of this type can be used
- * with Nephele's RPC system.
- * <p>
- * This class is not thread-safe.
+ * {@link org.apache.flink.core.io.IOReadableWritable} interface.
*
* @param <T>
* the type used in this hash set
@@ -49,49 +43,36 @@ public class SerializableHashSet<T extends IOReadableWritable> extends HashSet<T
@Override
public void write(final DataOutputView out) throws IOException {
-
out.writeInt(size());
- final Iterator<T> it = iterator();
-
- while (it.hasNext()) {
-
- final T entry = it.next();
- StringRecord.writeString(out, entry.getClass().getName());
+ for (T entry : this) {
+ StringUtils.writeNullableString(entry.getClass().getName(), out);
entry.write(out);
}
}
-
- @SuppressWarnings("unchecked")
- // TODO: See if type safety can be improved here
@Override
public void read(final DataInputView in) throws IOException {
-
final int numberOfMapEntries = in.readInt();
for (int i = 0; i < numberOfMapEntries; i++) {
+ final String type = StringUtils.readNullableString(in);
- final String type = StringRecord.readString(in);
- Class<T> clazz = null;
- try {
- clazz = (Class<T>) Class.forName(type);
- } catch (ClassNotFoundException e) {
- throw new IOException(StringUtils.stringifyException(e));
- }
-
- T entry = null;
+ T entry;
try {
+ @SuppressWarnings("unchecked")
+ Class<T> clazz = (Class<T>) Class.forName(type);
entry = clazz.newInstance();
- } catch (Exception e) {
- throw new IOException(StringUtils.stringifyException(e));
+ }
+ catch (ClassNotFoundException e) {
+ throw new IOException(e);
+ }
+ catch (Exception e) {
+ throw new IOException(e);
}
entry.read(in);
-
add(entry);
}
-
}
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/12c555f5/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java
index 89cc50d..c9ca9fb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.io.disk.iomanager;
import java.io.BufferedInputStream;
@@ -32,6 +31,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.List;
+import org.apache.flink.types.IntValue;
import org.junit.Assert;
import org.slf4j.Logger;
@@ -39,24 +39,14 @@ import org.slf4j.LoggerFactory;
import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
-import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
-import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
-import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
-import org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView;
-import org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.DefaultMemoryManagerTest;
import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
-import org.apache.flink.runtime.types.IntegerRecord;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-/**
- *
- *
- */
+
public class IOManagerPerformanceBenchmark {
private static final Logger LOG = LoggerFactory.getLogger(IOManagerPerformanceBenchmark.class);
@@ -246,7 +236,7 @@ public class IOManagerPerformanceBenchmark {
private void speedTestStream(int bufferSize) throws IOException {
final FileIOChannel.ID tmpChannel = ioManager.createChannel();
- final IntegerRecord rec = new IntegerRecord(0);
+ final IntValue rec = new IntValue(0);
File tempFile = null;
DataOutputStream daos = null;
http://git-wip-us.apache.org/repos/asf/flink/blob/12c555f5/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/DefaultChannelSelectorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/DefaultChannelSelectorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/DefaultChannelSelectorTest.java
index 13aac80..a54f5d9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/DefaultChannelSelectorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/DefaultChannelSelectorTest.java
@@ -16,13 +16,12 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.io.network;
import static org.junit.Assert.assertEquals;
-import org.apache.flink.core.io.StringRecord;
import org.apache.flink.runtime.io.network.api.writer.RoundRobinChannelSelector;
+import org.apache.flink.types.StringValue;
import org.junit.Test;
/**
@@ -37,8 +36,8 @@ public class DefaultChannelSelectorTest {
@Test
public void channelSelect() {
- final StringRecord dummyRecord = new StringRecord("abc");
- final RoundRobinChannelSelector<StringRecord> selector = new RoundRobinChannelSelector<StringRecord>();
+ final StringValue dummyRecord = new StringValue("abc");
+ final RoundRobinChannelSelector<StringValue> selector = new RoundRobinChannelSelector<StringValue>();
// Test with two channels
final int numberOfOutputChannels = 2;
int[] selectedChannels = selector.selectChannels(dummyRecord, numberOfOutputChannels);
http://git-wip-us.apache.org/repos/asf/flink/blob/12c555f5/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
index 05812b5..f984ca9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
@@ -30,7 +30,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.types.IntegerRecord;
+import org.apache.flink.types.IntValue;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -130,19 +130,19 @@ public class SlotCountExceedingParallelismTest {
public final static String CONFIG_KEY = "number-of-times-to-send";
- private RecordWriter<IntegerRecord> writer;
+ private RecordWriter<IntValue> writer;
private int numberOfTimesToSend;
@Override
public void registerInputOutput() {
- writer = new RecordWriter<IntegerRecord>(getEnvironment().getWriter(0));
+ writer = new RecordWriter<IntValue>(getEnvironment().getWriter(0));
numberOfTimesToSend = getTaskConfiguration().getInteger(CONFIG_KEY, 0);
}
@Override
public void invoke() throws Exception {
- final IntegerRecord subtaskIndex = new IntegerRecord(
+ final IntValue subtaskIndex = new IntValue(
getEnvironment().getIndexInSubtaskGroup());
try {
@@ -164,7 +164,7 @@ public class SlotCountExceedingParallelismTest {
public final static String CONFIG_KEY = "number-of-indexes-to-receive";
- private RecordReader<IntegerRecord> reader;
+ private RecordReader<IntValue> reader;
private int numberOfSubtaskIndexesToReceive;
@@ -173,9 +173,9 @@ public class SlotCountExceedingParallelismTest {
@Override
public void registerInputOutput() {
- reader = new RecordReader<IntegerRecord>(
+ reader = new RecordReader<IntValue>(
getEnvironment().getInputGate(0),
- IntegerRecord.class);
+ IntValue.class);
numberOfSubtaskIndexesToReceive = getTaskConfiguration().getInteger(CONFIG_KEY, 0);
receivedSubtaskIndexes = new BitSet(numberOfSubtaskIndexesToReceive);
@@ -184,7 +184,7 @@ public class SlotCountExceedingParallelismTest {
@Override
public void invoke() throws Exception {
try {
- IntegerRecord record;
+ IntValue record;
int numberOfReceivedSubtaskIndexes = 0;
http://git-wip-us.apache.org/repos/asf/flink/blob/12c555f5/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
index 70bbc25..ade14a1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
@@ -30,7 +30,7 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.SlotCountExceedingParallelismTest;
import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.types.IntegerRecord;
+import org.apache.flink.types.IntValue;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -131,7 +131,7 @@ public class ScheduleOrUpdateConsumersTest {
public final static String CONFIG_KEY = "number-of-times-to-send";
- private List<RecordWriter<IntegerRecord>> writers = Lists.newArrayListWithCapacity(2);
+ private List<RecordWriter<IntValue>> writers = Lists.newArrayListWithCapacity(2);
private int numberOfTimesToSend;
@@ -139,11 +139,11 @@ public class ScheduleOrUpdateConsumersTest {
public void registerInputOutput() {
// The order of intermediate result creation in the job graph specifies which produced
// result partition is pipelined/blocking.
- final RecordWriter<IntegerRecord> pipelinedWriter =
- new RecordWriter<IntegerRecord>(getEnvironment().getWriter(0));
+ final RecordWriter<IntValue> pipelinedWriter =
+ new RecordWriter<IntValue>(getEnvironment().getWriter(0));
- final RecordWriter<IntegerRecord> blockingWriter =
- new RecordWriter<IntegerRecord>(getEnvironment().getWriter(1));
+ final RecordWriter<IntValue> blockingWriter =
+ new RecordWriter<IntValue>(getEnvironment().getWriter(1));
writers.add(pipelinedWriter);
writers.add(blockingWriter);
@@ -153,11 +153,11 @@ public class ScheduleOrUpdateConsumersTest {
@Override
public void invoke() throws Exception {
- final IntegerRecord subtaskIndex = new IntegerRecord(
+ final IntValue subtaskIndex = new IntValue(
getEnvironment().getIndexInSubtaskGroup());
// Produce the first intermediate result and then the second in a serial fashion.
- for (RecordWriter<IntegerRecord> writer : writers) {
+ for (RecordWriter<IntValue> writer : writers) {
try {
for (int i = 0; i < numberOfTimesToSend; i++) {
writer.emit(subtaskIndex);
http://git-wip-us.apache.org/repos/asf/flink/blob/12c555f5/flink-runtime/src/test/java/org/apache/flink/runtime/types/StringRecordTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/types/StringRecordTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/types/StringRecordTest.java
deleted file mode 100644
index 6bd2f30..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/types/StringRecordTest.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.types;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.not;
-import static org.hamcrest.Matchers.nullValue;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.when;
-import static org.mockito.MockitoAnnotations.initMocks;
-
-import java.io.DataInput;
-import java.io.IOException;
-
-import org.apache.flink.core.io.StringRecord;
-import org.apache.flink.runtime.testutils.CommonTestUtils;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mock;
-
-
-/**
- * TODO: {@link StringRecord} has a lot of public methods that need to be tested.
- */
-public class StringRecordTest {
-
- @Mock
- private DataInput inputMock;
-
- @Before
- public void setUp() {
- initMocks(this);
- }
-
- /**
- * Tests the serialization/deserialization of the {@link StringRecord} class.
- */
- @Test
- public void testStringRecord() {
-
- final StringRecord orig = new StringRecord("Test Record");
-
- try {
-
- final StringRecord copy = (StringRecord) CommonTestUtils.createCopyWritable(orig);
-
- assertEquals(orig.getLength(), copy.getLength());
- assertEquals(orig.toString(), copy.toString());
- assertEquals(orig, copy);
- assertEquals(orig.hashCode(), copy.hashCode());
-
- } catch (IOException ioe) {
- fail(ioe.getMessage());
- }
- }
-
- @Test
- public void shouldReadProperInputs() {
- try {
-
- when(this.inputMock.readBoolean()).thenReturn(true);
- when(this.inputMock.readInt()).thenReturn(10);
-
- final String readString = StringRecord.readString(inputMock);
- assertThat(readString, is(not(nullValue())));
-
- } catch (IOException ioe) {
- fail(ioe.getMessage());
- }
-
- }
-
- @Test
- public void shouldReadNegativeInputs() {
-
- try {
-
- when(this.inputMock.readBoolean()).thenReturn(true);
- when(this.inputMock.readInt()).thenReturn(-1);
- } catch(IOException ioe) {
- fail(ioe.getMessage());
- }
-
- try {
- StringRecord.readString(inputMock);
- } catch(IOException ioe) {
- return;
- }
-
- fail("StringRecord.readString did not throw an IOException for negative length of string");
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/12c555f5/flink-runtime/src/test/java/org/apache/flink/runtime/types/TypeTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/types/TypeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/types/TypeTest.java
deleted file mode 100644
index 4f02c99..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/types/TypeTest.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.types;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-
-import org.apache.flink.runtime.testutils.CommonTestUtils;
-import org.apache.flink.runtime.types.FileRecord;
-import org.apache.flink.runtime.types.IntegerRecord;
-import org.junit.Test;
-
-/**
- * This class contains test which check the correct serialization/deserialization of Nephele's built-in data types.
- *
- */
-public class TypeTest {
-
- /**
- * Tests the serialization/deserialization of the {@link FileRecord} class.
- */
- @Test
- public void testFileRecord() {
-
- final FileRecord orig = new FileRecord("Test Filename");
- final byte[] data = new byte[128];
-
- orig.append(data, 0, data.length);
- orig.append(data, 0, data.length);
-
- assertEquals(orig.getDataBuffer().length, 2 * data.length);
-
- try {
- final FileRecord copy = (FileRecord) CommonTestUtils.createCopyWritable(orig);
-
- assertEquals(orig.getFileName(), copy.getFileName());
- assertEquals(orig, copy);
- assertEquals(orig.hashCode(), copy.hashCode());
-
- } catch (IOException ioe) {
- fail(ioe.getMessage());
- }
-
- }
-
- /**
- * Tests the serialization/deserialization of the {@link IntegerRecord} class.
- */
- @Test
- public void testIntegerRecord() {
-
- final IntegerRecord orig = new IntegerRecord(12);
-
- try {
-
- final IntegerRecord copy = (IntegerRecord) CommonTestUtils.createCopyWritable(orig);
-
- assertEquals(orig.getValue(), copy.getValue());
- assertEquals(orig, copy);
- assertEquals(orig.hashCode(), copy.hashCode());
-
- } catch (IOException ioe) {
- fail(ioe.getMessage());
- }
-
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/12c555f5/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
index 9bb1a3b..f6f7cc6 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
@@ -21,7 +21,8 @@ package org.apache.flink.runtime.jobmanager
import org.apache.flink.runtime.io.network.api.reader.RecordReader
import org.apache.flink.runtime.io.network.api.writer.RecordWriter
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable
-import org.apache.flink.runtime.types.IntegerRecord
+import org.apache.flink.types.IntValue
+
object Tasks {
class BlockingNoOpInvokable extends AbstractInvokable {
@@ -52,15 +53,15 @@ object Tasks {
}
class Sender extends AbstractInvokable{
- var writer: RecordWriter[IntegerRecord] = _
+ var writer: RecordWriter[IntValue] = _
override def registerInputOutput(): Unit = {
- writer = new RecordWriter[IntegerRecord](getEnvironment.getWriter(0))
+ writer = new RecordWriter[IntValue](getEnvironment.getWriter(0))
}
override def invoke(): Unit = {
try{
- writer.emit(new IntegerRecord(42))
- writer.emit(new IntegerRecord(1337))
+ writer.emit(new IntValue(42))
+ writer.emit(new IntValue(1337))
writer.flush()
}finally{
writer.clearBuffers()
@@ -69,12 +70,12 @@ object Tasks {
}
class Forwarder extends AbstractInvokable {
- var reader: RecordReader[IntegerRecord] = _
- var writer: RecordWriter[IntegerRecord] = _
+ var reader: RecordReader[IntValue] = _
+ var writer: RecordWriter[IntValue] = _
override def registerInputOutput(): Unit = {
- reader = new RecordReader[IntegerRecord](getEnvironment.getInputGate(0),
- classOf[IntegerRecord])
- writer = new RecordWriter[IntegerRecord](getEnvironment.getWriter(0))
+ reader = new RecordReader[IntValue](getEnvironment.getInputGate(0),
+ classOf[IntValue])
+ writer = new RecordWriter[IntValue](getEnvironment.getWriter(0))
}
override def invoke(): Unit = {
@@ -97,12 +98,12 @@ object Tasks {
}
class Receiver extends AbstractInvokable {
- var reader: RecordReader[IntegerRecord] = _
+ var reader: RecordReader[IntValue] = _
override def registerInputOutput(): Unit = {
val env = getEnvironment
- reader = new RecordReader[IntegerRecord](env.getInputGate(0), classOf[IntegerRecord])
+ reader = new RecordReader[IntValue](env.getInputGate(0), classOf[IntValue])
}
override def invoke(): Unit = {
@@ -154,12 +155,12 @@ object Tasks {
}
class AgnosticReceiver extends AbstractInvokable {
- var reader: RecordReader[IntegerRecord] = _
+ var reader: RecordReader[IntValue] = _
override def registerInputOutput(): Unit = {
val env = getEnvironment
- reader = new RecordReader[IntegerRecord](env.getInputGate(0), classOf[IntegerRecord])
+ reader = new RecordReader[IntValue](env.getInputGate(0), classOf[IntValue])
}
override def invoke(): Unit = {
@@ -168,14 +169,14 @@ object Tasks {
}
class AgnosticBinaryReceiver extends AbstractInvokable {
- var reader1: RecordReader[IntegerRecord] = _
- var reader2: RecordReader[IntegerRecord] = _
+ var reader1: RecordReader[IntValue] = _
+ var reader2: RecordReader[IntValue] = _
override def registerInputOutput(): Unit = {
val env = getEnvironment
- reader1 = new RecordReader[IntegerRecord](env.getInputGate(0), classOf[IntegerRecord])
- reader2 = new RecordReader[IntegerRecord](env.getInputGate(1), classOf[IntegerRecord])
+ reader1 = new RecordReader[IntValue](env.getInputGate(0), classOf[IntValue])
+ reader2 = new RecordReader[IntValue](env.getInputGate(1), classOf[IntValue])
}
override def invoke(): Unit = {
@@ -185,16 +186,16 @@ object Tasks {
}
class AgnosticTertiaryReceiver extends AbstractInvokable {
- var reader1: RecordReader[IntegerRecord] = _
- var reader2: RecordReader[IntegerRecord] = _
- var reader3: RecordReader[IntegerRecord] = _
+ var reader1: RecordReader[IntValue] = _
+ var reader2: RecordReader[IntValue] = _
+ var reader3: RecordReader[IntValue] = _
override def registerInputOutput(): Unit = {
val env = getEnvironment
- reader1 = new RecordReader[IntegerRecord](env.getInputGate(0), classOf[IntegerRecord])
- reader2 = new RecordReader[IntegerRecord](env.getInputGate(1), classOf[IntegerRecord])
- reader3 = new RecordReader[IntegerRecord](env.getInputGate(2), classOf[IntegerRecord])
+ reader1 = new RecordReader[IntValue](env.getInputGate(0), classOf[IntValue])
+ reader2 = new RecordReader[IntValue](env.getInputGate(1), classOf[IntValue])
+ reader3 = new RecordReader[IntValue](env.getInputGate(2), classOf[IntValue])
}
override def invoke(): Unit = {
@@ -205,10 +206,10 @@ object Tasks {
}
class ExceptionSender extends AbstractInvokable{
- var writer: RecordWriter[IntegerRecord] = _
+ var writer: RecordWriter[IntValue] = _
override def registerInputOutput(): Unit = {
- writer = new RecordWriter[IntegerRecord](getEnvironment.getWriter(0))
+ writer = new RecordWriter[IntValue](getEnvironment.getWriter(0))
}
override def invoke(): Unit = {
@@ -217,10 +218,10 @@ object Tasks {
}
class SometimesExceptionSender extends AbstractInvokable {
- var writer: RecordWriter[IntegerRecord] = _
+ var writer: RecordWriter[IntValue] = _
override def registerInputOutput(): Unit = {
- writer = new RecordWriter[IntegerRecord](getEnvironment.getWriter(0))
+ writer = new RecordWriter[IntValue](getEnvironment.getWriter(0))
}
override def invoke(): Unit = {
@@ -240,7 +241,7 @@ object Tasks {
class ExceptionReceiver extends AbstractInvokable {
override def registerInputOutput(): Unit = {
- new RecordReader[IntegerRecord](getEnvironment.getInputGate(0), classOf[IntegerRecord])
+ new RecordReader[IntValue](getEnvironment.getInputGate(0), classOf[IntValue])
}
override def invoke(): Unit = {
@@ -266,7 +267,7 @@ object Tasks {
}
override def registerInputOutput(): Unit = {
- new RecordWriter[IntegerRecord](getEnvironment.getWriter(0))
+ new RecordWriter[IntValue](getEnvironment.getWriter(0))
}
override def invoke(): Unit = {
@@ -281,7 +282,7 @@ object Tasks {
class BlockingReceiver extends AbstractInvokable {
override def registerInputOutput(): Unit = {
- new RecordReader[IntegerRecord](getEnvironment.getInputGate(0), classOf[IntegerRecord])
+ new RecordReader[IntValue](getEnvironment.getInputGate(0), classOf[IntValue])
}
override def invoke(): Unit = {