You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by nk...@apache.org on 2018/07/19 19:08:51 UTC
[1/2] flink git commit: [FLINK-9755][network] forward exceptions in
RemoteInputChannel#notifyBufferAvailable() to the responsible thread
Repository: flink
Updated Branches:
refs/heads/master 0cb7706da -> 95eadfe15
[FLINK-9755][network] forward exceptions in RemoteInputChannel#notifyBufferAvailable() to the responsible thread
This mainly involves state checks but previously these have only been swallowed
without re-registration or any other logging/handling. This may have lead to
some thread stalling while waiting for the notification that never came.
This closes #6272.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5857f554
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5857f554
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5857f554
Branch: refs/heads/master
Commit: 5857f5543a7d9d3082d2f74342758d5a452a3c13
Parents: 0cb7706
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Thu Jul 5 00:48:33 2018 +0200
Committer: Nico Kruber <ni...@data-artisans.com>
Committed: Thu Jul 19 17:01:19 2018 +0200
----------------------------------------------------------------------
.../io/network/buffer/BufferListener.java | 9 ++
.../io/network/buffer/LocalBufferPool.java | 36 ++------
.../partition/consumer/RemoteInputChannel.java | 52 ++++++-----
.../consumer/RemoteInputChannelTest.java | 90 ++++++++++++++++++--
4 files changed, 131 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5857f554/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferListener.java
index 05b4156..4cc32c0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferListener.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferListener.java
@@ -27,6 +27,15 @@ public interface BufferListener {
/**
* Notification callback if a buffer is recycled and becomes available in buffer pool.
*
+ * <p>Note: responsibility on recycling the given buffer is transferred to this implementation,
+ * including any errors that lead to exceptions being thrown!
+ *
+ * <p><strong>BEWARE:</strong> since this may be called from outside the thread that relies on
+ * the listener's logic, any exception that occurs with this handler should be forwarded to the
+ * responsible thread for handling and otherwise ignored in the processing of this method. The
+ * buffer pool forwards any {@link Throwable} from here upwards to a potentially unrelated call
+ * stack!
+ *
* @param buffer buffer that becomes available in buffer pool.
* @return true if the listener wants to be notified next time.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/5857f554/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
index e874723..1596fde 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
@@ -262,8 +262,7 @@ class LocalBufferPool implements BufferPool {
if (isDestroyed || numberOfRequestedMemorySegments > currentPoolSize) {
returnMemorySegment(segment);
return;
- }
- else {
+ } else {
listener = registeredListeners.poll();
if (listener == null) {
@@ -277,37 +276,18 @@ class LocalBufferPool implements BufferPool {
// We do not know which locks have been acquired before the recycle() or are needed in the
// notification and which other threads also access them.
// -> call notifyBufferAvailable() outside of the synchronized block to avoid a deadlock (FLINK-9676)
- boolean success = false;
- boolean needMoreBuffers = false;
- try {
- needMoreBuffers = listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
- success = true;
- } catch (Throwable ignored) {
- // handled below, under the lock
- }
+ // Note that in case of any exceptions notifyBufferAvailable() should recycle the buffer
+ // (either directly or later during error handling) and therefore eventually end up in this
+ // method again.
+ boolean needMoreBuffers = listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
- if (!success || needMoreBuffers) {
+ if (needMoreBuffers) {
synchronized (availableMemorySegments) {
if (isDestroyed) {
// cleanup tasks how they would have been done if we only had one synchronized block
- if (needMoreBuffers) {
- listener.notifyBufferDestroyed();
- }
- if (!success) {
- returnMemorySegment(segment);
- }
+ listener.notifyBufferDestroyed();
} else {
- if (needMoreBuffers) {
- registeredListeners.add(listener);
- }
- if (!success) {
- if (numberOfRequestedMemorySegments > currentPoolSize) {
- returnMemorySegment(segment);
- } else {
- availableMemorySegments.add(segment);
- availableMemorySegments.notify();
- }
- }
+ registeredListeners.add(listener);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5857f554/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index 0f70d44..b94f48a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -360,32 +360,44 @@ public class RemoteInputChannel extends InputChannel implements BufferRecycler,
return false;
}
- boolean needMoreBuffers = false;
- synchronized (bufferQueue) {
- checkState(isWaitingForFloatingBuffers, "This channel should be waiting for floating buffers.");
+ boolean recycleBuffer = true;
+ try {
+ boolean needMoreBuffers = false;
+ synchronized (bufferQueue) {
+ checkState(isWaitingForFloatingBuffers,
+ "This channel should be waiting for floating buffers.");
+
+ // Important: double check the isReleased state inside synchronized block, so there is no
+ // race condition when notifyBufferAvailable and releaseAllResources running in parallel.
+ if (isReleased.get() || bufferQueue.getAvailableBufferSize() >= numRequiredBuffers) {
+ isWaitingForFloatingBuffers = false;
+ recycleBuffer = false; // just in case
+ buffer.recycleBuffer();
+ return false;
+ }
- // Important: double check the isReleased state inside synchronized block, so there is no
- // race condition when notifyBufferAvailable and releaseAllResources running in parallel.
- if (isReleased.get() || bufferQueue.getAvailableBufferSize() >= numRequiredBuffers) {
- isWaitingForFloatingBuffers = false;
- buffer.recycleBuffer();
- return false;
- }
+ recycleBuffer = false;
+ bufferQueue.addFloatingBuffer(buffer);
- bufferQueue.addFloatingBuffer(buffer);
+ if (bufferQueue.getAvailableBufferSize() == numRequiredBuffers) {
+ isWaitingForFloatingBuffers = false;
+ } else {
+ needMoreBuffers = true;
+ }
- if (bufferQueue.getAvailableBufferSize() == numRequiredBuffers) {
- isWaitingForFloatingBuffers = false;
- } else {
- needMoreBuffers = true;
+ if (unannouncedCredit.getAndAdd(1) == 0) {
+ notifyCreditAvailable();
+ }
}
- }
- if (unannouncedCredit.getAndAdd(1) == 0) {
- notifyCreditAvailable();
+ return needMoreBuffers;
+ } catch (Throwable t) {
+ if (recycleBuffer) {
+ buffer.recycleBuffer();
+ }
+ setError(t);
+ return false;
}
-
- return needMoreBuffers;
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/5857f554/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index 6c6fd96..6305492 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -52,9 +52,13 @@ import java.util.concurrent.Future;
import scala.Tuple2;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.hasProperty;
+import static org.hamcrest.Matchers.isA;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
@@ -452,7 +456,7 @@ public class RemoteInputChannelTest {
} catch (Throwable t) {
thrown = t;
} finally {
- cleanup(networkBufferPool, null, thrown, inputChannel);
+ cleanup(networkBufferPool, null, null, thrown, inputChannel);
}
}
@@ -528,7 +532,7 @@ public class RemoteInputChannelTest {
} catch (Throwable t) {
thrown = t;
} finally {
- cleanup(networkBufferPool, null, thrown, inputChannel);
+ cleanup(networkBufferPool, null, null, thrown, inputChannel);
}
}
@@ -618,7 +622,7 @@ public class RemoteInputChannelTest {
} catch (Throwable t) {
thrown = t;
} finally {
- cleanup(networkBufferPool, null, thrown, inputChannel);
+ cleanup(networkBufferPool, null, null, thrown, inputChannel);
}
}
@@ -687,7 +691,72 @@ public class RemoteInputChannelTest {
} catch (Throwable t) {
thrown = t;
} finally {
- cleanup(networkBufferPool, null, thrown, channel1, channel2, channel3);
+ cleanup(networkBufferPool, null, null, thrown, channel1, channel2, channel3);
+ }
+ }
+
+ /**
+ * Tests that failures are propagated correctly if
+ * {@link RemoteInputChannel#notifyBufferAvailable(Buffer)} throws an exception. Also tests that
+ * a second listener will be notified in this case.
+ */
+ @Test
+ public void testFailureInNotifyBufferAvailable() throws Exception {
+ // Setup
+ final int numExclusiveBuffers = 0;
+ final int numFloatingBuffers = 1;
+ final int numTotalBuffers = numExclusiveBuffers + numFloatingBuffers;
+ final NetworkBufferPool networkBufferPool = new NetworkBufferPool(
+ numTotalBuffers, 32);
+
+ final SingleInputGate inputGate = createSingleInputGate();
+ final RemoteInputChannel successfulRemoteIC = createRemoteInputChannel(inputGate);
+ inputGate.setInputChannel(successfulRemoteIC.partitionId.getPartitionId(), successfulRemoteIC);
+
+ successfulRemoteIC.requestSubpartition(0);
+
+ // late creation -> no exclusive buffers, also no requested subpartition in successfulRemoteIC
+ // (to trigger a failure in RemoteInputChannel#notifyBufferAvailable())
+ final RemoteInputChannel failingRemoteIC = createRemoteInputChannel(inputGate);
+ inputGate.setInputChannel(failingRemoteIC.partitionId.getPartitionId(), failingRemoteIC);
+
+ Buffer buffer = null;
+ Throwable thrown = null;
+ try {
+ final BufferPool bufferPool =
+ networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers);
+ inputGate.setBufferPool(bufferPool);
+
+ buffer = bufferPool.requestBufferBlocking();
+
+ // trigger subscription to buffer pool
+ failingRemoteIC.onSenderBacklog(1);
+ successfulRemoteIC.onSenderBacklog(numExclusiveBuffers + 1);
+ // recycling will call RemoteInputChannel#notifyBufferAvailable() which will fail and
+ // this exception will be swallowed and set as an error in failingRemoteIC
+ buffer.recycleBuffer();
+ buffer = null;
+ try {
+ failingRemoteIC.checkError();
+ fail("The input channel should have an error based on the failure in RemoteInputChannel#notifyBufferAvailable()");
+ } catch (IOException e) {
+ assertThat(e, hasProperty("cause", isA(IllegalStateException.class)));
+ }
+ // currently, the buffer is still enqueued in the bufferQueue of failingRemoteIC
+ assertEquals(0, bufferPool.getNumberOfAvailableMemorySegments());
+ buffer = successfulRemoteIC.requestBuffer();
+ assertNull("buffer should still remain in failingRemoteIC", buffer);
+
+ // releasing resources in failingRemoteIC should free the buffer again and immediately
+ // recycle it into successfulRemoteIC
+ failingRemoteIC.releaseAllResources();
+ assertEquals(0, bufferPool.getNumberOfAvailableMemorySegments());
+ buffer = successfulRemoteIC.requestBuffer();
+ assertNotNull("no buffer given to successfulRemoteIC", buffer);
+ } catch (Throwable t) {
+ thrown = t;
+ } finally {
+ cleanup(networkBufferPool, null, buffer, thrown, failingRemoteIC, successfulRemoteIC);
}
}
@@ -749,7 +818,7 @@ public class RemoteInputChannelTest {
} catch (Throwable t) {
thrown = t;
} finally {
- cleanup(networkBufferPool, executor, thrown, inputChannel);
+ cleanup(networkBufferPool, executor, null, thrown, inputChannel);
}
}
@@ -802,7 +871,7 @@ public class RemoteInputChannelTest {
} catch (Throwable t) {
thrown = t;
} finally {
- cleanup(networkBufferPool, executor, thrown, inputChannel);
+ cleanup(networkBufferPool, executor, null, thrown, inputChannel);
}
}
@@ -854,7 +923,7 @@ public class RemoteInputChannelTest {
} catch (Throwable t) {
thrown = t;
} finally {
- cleanup(networkBufferPool, executor, thrown, inputChannel);
+ cleanup(networkBufferPool, executor, null, thrown, inputChannel);
}
}
@@ -936,7 +1005,7 @@ public class RemoteInputChannelTest {
} catch (Throwable t) {
thrown = t;
} finally {
- cleanup(networkBufferPool, executor, thrown, inputChannel);
+ cleanup(networkBufferPool, executor, null, thrown, inputChannel);
}
}
@@ -1064,6 +1133,7 @@ public class RemoteInputChannelTest {
private void cleanup(
NetworkBufferPool networkBufferPool,
@Nullable ExecutorService executor,
+ @Nullable Buffer buffer,
@Nullable Throwable throwable,
InputChannel... inputChannels) throws Exception {
for (InputChannel inputChannel : inputChannels) {
@@ -1074,6 +1144,10 @@ public class RemoteInputChannelTest {
}
}
+ if (buffer != null && !buffer.isRecycled()) {
+ buffer.recycleBuffer();
+ }
+
try {
networkBufferPool.destroyAllBufferPools();
} catch (Throwable tInner) {
[2/2] flink git commit: [FLINK-9435][java] optimise
ComparableKeySelector and ArrayKeySelector for more efficient Tuple creation
Posted by nk...@apache.org.
[FLINK-9435][java] optimise ComparableKeySelector and ArrayKeySelector for more efficient Tuple creation
Benchmark results (2 runs) by running the benchmarks from
https://github.com/dataArtisans/flink-benchmarks/pull/5:
Benchmark Mode Cnt Score Error Units
================= old =================
KeyByBenchmarks.arrayKeyBy thrpt 30 1151.305 ± 21.096 ops/ms
KeyByBenchmarks.arrayKeyBy thrpt 30 1117.486 ± 43.508 ops/ms
KeyByBenchmarks.tupleKeyBy thrpt 30 1659.634 ± 28.627 ops/ms
KeyByBenchmarks.tupleKeyBy thrpt 30 1554.265 ± 82.604 ops/ms
================= new =================
KeyByBenchmarks.arrayKeyBy thrpt 30 1150.552 ± 51.185 ops/ms
KeyByBenchmarks.arrayKeyBy thrpt 30 1195.777 ± 10.621 ops/ms
KeyByBenchmarks.tupleKeyBy thrpt 30 1743.633 ± 27.109 ops/ms
KeyByBenchmarks.tupleKeyBy thrpt 30 1697.885 ± 22.101 ops/ms
This closes #6115.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/95eadfe1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/95eadfe1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/95eadfe1
Branch: refs/heads/master
Commit: 95eadfe15203ee0ab1459a9ade943234d9d6e7ce
Parents: 5857f55
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Fri May 25 00:09:37 2018 +0200
Committer: Nico Kruber <ni...@data-artisans.com>
Committed: Thu Jul 19 17:07:09 2018 +0200
----------------------------------------------------------------------
.../org/apache/flink/api/java/tuple/Tuple.java | 32 ++++++++++++++++++++
.../flink/api/java/tuple/TupleGenerator.java | 11 +++++++
.../streaming/util/keys/KeySelectorUtil.java | 10 +++---
3 files changed, 47 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/95eadfe1/flink-core/src/main/java/org/apache/flink/api/java/tuple/Tuple.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/tuple/Tuple.java b/flink-core/src/main/java/org/apache/flink/api/java/tuple/Tuple.java
index c282c59..7ce38f8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/tuple/Tuple.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/tuple/Tuple.java
@@ -113,6 +113,38 @@ public abstract class Tuple implements java.io.Serializable {
// BEGIN_OF_TUPLE_DEPENDENT_CODE
// GENERATED FROM org.apache.flink.api.java.tuple.TupleGenerator.
+ public static Tuple newInstance(int arity) {
+ switch (arity) {
+ case 0: return Tuple0.INSTANCE;
+ case 1: return new Tuple1();
+ case 2: return new Tuple2();
+ case 3: return new Tuple3();
+ case 4: return new Tuple4();
+ case 5: return new Tuple5();
+ case 6: return new Tuple6();
+ case 7: return new Tuple7();
+ case 8: return new Tuple8();
+ case 9: return new Tuple9();
+ case 10: return new Tuple10();
+ case 11: return new Tuple11();
+ case 12: return new Tuple12();
+ case 13: return new Tuple13();
+ case 14: return new Tuple14();
+ case 15: return new Tuple15();
+ case 16: return new Tuple16();
+ case 17: return new Tuple17();
+ case 18: return new Tuple18();
+ case 19: return new Tuple19();
+ case 20: return new Tuple20();
+ case 21: return new Tuple21();
+ case 22: return new Tuple22();
+ case 23: return new Tuple23();
+ case 24: return new Tuple24();
+ case 25: return new Tuple25();
+ default: throw new IllegalArgumentException("The tuple arity must be in [0, " + MAX_ARITY + "].");
+ }
+ }
+
private static final Class<?>[] CLASSES = new Class<?>[] {
Tuple0.class, Tuple1.class, Tuple2.class, Tuple3.class, Tuple4.class, Tuple5.class, Tuple6.class, Tuple7.class, Tuple8.class, Tuple9.class, Tuple10.class, Tuple11.class, Tuple12.class, Tuple13.class, Tuple14.class, Tuple15.class, Tuple16.class, Tuple17.class, Tuple18.class, Tuple19.class, Tuple20.class, Tuple21.class, Tuple22.class, Tuple23.class, Tuple24.class, Tuple25.class
};
http://git-wip-us.apache.org/repos/asf/flink/blob/95eadfe1/flink-core/src/test/java/org/apache/flink/api/java/tuple/TupleGenerator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/tuple/TupleGenerator.java b/flink-core/src/test/java/org/apache/flink/api/java/tuple/TupleGenerator.java
index bd5598a..d684967 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/tuple/TupleGenerator.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/tuple/TupleGenerator.java
@@ -133,6 +133,17 @@ class TupleGenerator {
private static void modifyTupleType(File root) throws IOException {
// generate code
StringBuilder sb = new StringBuilder();
+ sb.append("\tpublic static Tuple newInstance(int arity) {\n");
+ sb.append("\t\tswitch (arity) {\n");
+ // special case for Tuple0:
+ sb.append("\t\t\tcase 0: return Tuple0.INSTANCE;\n");
+ for (int i = FIRST; i <= LAST; i++) {
+ sb.append("\t\t\tcase ").append(i).append(": return new Tuple").append(i).append("();\n");
+ }
+ sb.append("\t\t\tdefault: throw new IllegalArgumentException(\"The tuple arity must be in [0, \" + MAX_ARITY + \"].\");\n");
+ sb.append("\t\t}\n");
+ sb.append("\t}\n\n");
+
sb.append("\tprivate static final Class<?>[] CLASSES = new Class<?>[] {\n\t\tTuple0.class");
for (int i = FIRST; i <= LAST; i++) {
sb.append(", Tuple").append(i).append(".class");
http://git-wip-us.apache.org/repos/asf/flink/blob/95eadfe1/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
index 27ce573..ab608ef 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
@@ -180,8 +180,8 @@ public final class KeySelectorUtil {
}
@Override
- public Tuple getKey(IN value) throws Exception {
- Tuple key = Tuple.getTupleClass(keyLength).newInstance();
+ public Tuple getKey(IN value) {
+ Tuple key = Tuple.newInstance(keyLength);
comparator.extractKeys(value, keyArray, 0);
for (int i = 0; i < keyLength; i++) {
key.setField(keyArray[i], i);
@@ -210,18 +210,16 @@ public final class KeySelectorUtil {
private static final long serialVersionUID = 1L;
private final int[] fields;
- private final Class<? extends Tuple> tupleClass;
private transient TupleTypeInfo<Tuple> returnType;
ArrayKeySelector(int[] fields, TupleTypeInfo<Tuple> returnType) {
this.fields = requireNonNull(fields);
this.returnType = requireNonNull(returnType);
- this.tupleClass = Tuple.getTupleClass(fields.length);
}
@Override
- public Tuple getKey(IN value) throws Exception {
- Tuple key = tupleClass.newInstance();
+ public Tuple getKey(IN value) {
+ Tuple key = Tuple.newInstance(fields.length);
for (int i = 0; i < fields.length; i++) {
key.setField(Array.get(value, fields[i]), i);
}