You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by mp...@apache.org on 2016/09/12 05:53:03 UTC

[1/2] kudu git commit: Fix one more flakiness in tablet_replacement-itest

Repository: kudu
Updated Branches:
  refs/heads/master 3a9ea63b9 -> 30099cf9e


Fix one more flakiness in tablet_replacement-itest

The same as 30a8337f05a8b92faff9400298aae640b6ebbecb, there was a case
where we were electing a leader and assuming it could immediately
perform a config change, even though its original NO_OP may not have
been committed yet.

Change-Id: I1924e3fd001887d166866252f268354827a9da41
Reviewed-on: http://gerrit.cloudera.org:8080/4345
Tested-by: Todd Lipcon <to...@apache.org>
Reviewed-by: Mike Percy <mp...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/81ee6c3f
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/81ee6c3f
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/81ee6c3f

Branch: refs/heads/master
Commit: 81ee6c3f3b3b05f8f145b823a2db5b1e7f80233e
Parents: 3a9ea63
Author: Todd Lipcon <to...@apache.org>
Authored: Fri Sep 9 09:52:22 2016 -0700
Committer: Mike Percy <mp...@apache.org>
Committed: Mon Sep 12 05:47:20 2016 +0000

----------------------------------------------------------------------
 src/kudu/integration-tests/tablet_replacement-itest.cc | 3 +++
 1 file changed, 3 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/81ee6c3f/src/kudu/integration-tests/tablet_replacement-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/tablet_replacement-itest.cc b/src/kudu/integration-tests/tablet_replacement-itest.cc
index 105bdc3..8324cea 100644
--- a/src/kudu/integration-tests/tablet_replacement-itest.cc
+++ b/src/kudu/integration-tests/tablet_replacement-itest.cc
@@ -152,6 +152,9 @@ TEST_F(TabletReplacementITest, TestMasterTombstoneOldReplicaOnReport) {
   ASSERT_OK(itest::StartElection(leader_ts, tablet_id, timeout));
   ASSERT_OK(itest::WaitForServersToAgree(timeout, ts_map_, tablet_id, 1)); // Wait for NO_OP.
 
+  // Wait until it has committed its NO_OP, so that we can perform a config change.
+  ASSERT_OK(itest::WaitUntilCommittedOpIdIndexIs(1, leader_ts, tablet_id, timeout));
+
   // Shut down the follower to be removed, then remove it from the config.
   // We will wait for the Master to be notified of the config change, then shut
   // down the rest of the cluster and bring the follower back up. The follower


[2/2] kudu git commit: Add RegexpKuduOperationsProducer class

Posted by mp...@apache.org.
Add RegexpKuduOperationsProducer class

This patch adds the RegexpKuduOperationsProducer class. This class
serializes Event objects to Kudu inserts or upserts by decoding
the body into a string, parsing the string using a regular
expression, and finally mapping match groups to columns by
matching the name of the match group to the name of the column.
Parsed values are naively coerced to the proper type.

This provides an easy-to-use but flexible way to ingest data with
varying schemas into Kudu from Flume.

Change-Id: Ibd64542095f0064a21635ec76c46d6a1be98b7ea
Reviewed-on: http://gerrit.cloudera.org:8080/3883
Tested-by: Kudu Jenkins
Reviewed-by: Mike Percy <mp...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/30099cf9
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/30099cf9
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/30099cf9

Branch: refs/heads/master
Commit: 30099cf9e62bcfe9344bcd76807b080d423ef0e1
Parents: 81ee6c3
Author: Will Berkeley <wd...@gmail.com>
Authored: Thu Aug 4 18:13:35 2016 -0700
Committer: Mike Percy <mp...@apache.org>
Committed: Mon Sep 12 05:52:51 2016 +0000

----------------------------------------------------------------------
 .../sink/RegexpKuduOperationsProducer.java      | 287 +++++++++++++++++++
 .../sink/RegexpKuduOperationsProducerTest.java  | 211 ++++++++++++++
 2 files changed, 498 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/30099cf9/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducer.java
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducer.java b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducer.java
new file mode 100644
index 0000000..8d5e22e
--- /dev/null
+++ b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducer.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.kudu.flume.sink;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import java.nio.charset.Charset;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.FlumeException;
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+/**
+ * <p>A regular expression serializer that generates one {@link Insert} or
+ * {@link Upsert} per {@link Event} by parsing the payload into values using a
+ * regular expression. Values are coerced to the proper column types.
+ *
+ * Example: if the Kudu table has the schema
+ *
+ * key INT32
+ * name STRING
+ *
+ * and producer.pattern is '(?<key>\\d+),(?<name>\w+)', then the
+ * RegexpKuduOperationsProducer will parse the string
+ *
+ * |12345,Mike||54321,Todd|
+ *
+ * into the rows (key=12345, name=Mike) and (key=54321, name=Todd).
+ *
+ * Note: this class relies on JDK7 named capturing groups, which are documented
+ * in {@link Pattern}.
+ *
+ * <p><strong>Regular Expression Kudu Operations Producer configuration parameters</strong>
+ *
+ * <table cellpadding=3 cellspacing=0 border=1>
+ * <tr>
+ *   <th>Property Name</th>
+ *   <th>Default</th>
+ *   <th>Required?</th>
+ *   <th>Description</th>
+ * </tr>
+ * <tr></tr><td>producer.pattern</td><td></td><td>Yes</td>
+ * <td>The regular expression used to parse the event body.</td>
+ * </tr>
+ * <tr>
+ *   <td>producer.charset</td>
+ *   <td>utf-8</td>
+ *   <td>No</td>
+ *   <td>The charset of the event body.</td>
+ * </tr>
+ * <tr>
+ *   <td>producer.operation</td>
+ *   <td>upsert</td>
+ *   <td>No</td>
+ *   <td>Operation type used to write the event to Kudu. Must be 'insert' or
+ *   'upsert'.</td>
+ * </tr>
+ * <tr>
+ *   <td>producer.skipMissingColumn</td>
+ *   <td>false</td>
+ *   <td>No</td>
+ *   <td>Whether to ignore a column if it has no corresponding capture group, or
+ *   instead completely abandon the attempt to parse and insert/upsert the row.
+ * </tr>
+ * <tr>
+ *   <td>producer.skipBadColumnValue</td>
+ *   <td>false</td>
+ *   <td>No</td>
+ *   <td>Whether to omit a column value from the row if its raw value cannot be
+ *   coerced to the right type, or instead complete abandon the attempt to parse
+ *   and insert/operation the row.</td>
+ * </tr>
+ * <tr>
+ *   <td>producer.warnUnmatchedRows</td>
+ *   <td>true</td>
+ *   <td>No</td>
+ *   <td>Whether to warn about payloads that do not match the pattern. If this
+ *   option is not set, event bodies with no matches will be silently dropped.</td>
+ * </tr>
+ * </table>
+ *
+ * @see Pattern
+ */
+public class RegexpKuduOperationsProducer implements KuduOperationsProducer {
+  private static final Logger logger = LoggerFactory.getLogger(RegexpKuduOperationsProducer.class);
+
+  public static final String PATTERN_PROP = "pattern";
+  public static final String ENCODING_PROP = "encoding";
+  public static final String DEFAULT_ENCODING = "utf-8";
+  public static final String OPERATION_PROP = "operation";
+  public static final String DEFAULT_OPERATION = "upsert";
+  public static final String SKIP_MISSING_COLUMN_PROP = "skipMissingColumn";
+  public static final boolean DEFAULT_SKIP_MISSING_COLUMN = false;
+  public static final String SKIP_BAD_COLUMN_VALUE_PROP = "skipBadColumnValue";
+  public static final boolean DEFAULT_SKIP_BAD_COLUMN_VALUE = false;
+  public static final String WARN_UNMATCHED_ROWS_PROP = "skipUnmatchedRows";
+  public static final boolean DEFAULT_WARN_UNMATCHED_ROWS = true;
+
+  private static final List<String> validOperations =
+      Lists.newArrayList("upsert", "insert");
+
+  private KuduTable table;
+  private Pattern pattern;
+  private Charset charset;
+  private String operation;
+  private boolean skipMissingColumn;
+  private boolean skipBadColumnValue;
+  private boolean warnUnmatchedRows;
+
+  public RegexpKuduOperationsProducer() {
+  }
+
+  @Override
+  public void configure(Context context) {
+    String regexp = context.getString(PATTERN_PROP);
+    Preconditions.checkArgument(regexp != null,
+        "Required parameter %s is not specified",
+        PATTERN_PROP);
+    try {
+      pattern = Pattern.compile(regexp);
+    } catch (PatternSyntaxException e) {
+      throw new IllegalArgumentException(
+          String.format("The pattern '%s' is invalid", PATTERN_PROP, regexp), e);
+    }
+    String charsetName = context.getString(ENCODING_PROP, DEFAULT_ENCODING);
+    try {
+      charset = Charset.forName(charsetName);
+    } catch (IllegalArgumentException e) {
+      throw new FlumeException(
+          String.format("Invalid or unsupported charset %s", charsetName), e);
+    }
+    operation = context.getString(OPERATION_PROP,
+        DEFAULT_OPERATION);
+    Preconditions.checkArgument(
+        validOperations.contains(operation.toLowerCase()),
+        "Unrecognized operation '%s'",
+        operation);
+    skipMissingColumn = context.getBoolean(SKIP_MISSING_COLUMN_PROP,
+        DEFAULT_SKIP_MISSING_COLUMN);
+    skipBadColumnValue = context.getBoolean(SKIP_BAD_COLUMN_VALUE_PROP,
+        DEFAULT_SKIP_BAD_COLUMN_VALUE);
+    warnUnmatchedRows = context.getBoolean(WARN_UNMATCHED_ROWS_PROP,
+        DEFAULT_WARN_UNMATCHED_ROWS);
+  }
+
+  @Override
+  public void initialize(KuduTable table) {
+    this.table = table;
+  }
+
+  @Override
+  public List<Operation> getOperations(Event event) throws FlumeException {
+    String raw = new String(event.getBody(), charset);
+    Matcher m = pattern.matcher(raw);
+    boolean match = false;
+    Schema schema = table.getSchema();
+    List<Operation> ops = Lists.newArrayList();
+    while (m.find()) {
+      match = true;
+      Operation op;
+      switch (operation.toLowerCase()) {
+        case "upsert":
+          op = table.newUpsert();
+          break;
+        case "insert":
+          op = table.newInsert();
+          break;
+        default:
+          throw new FlumeException(
+              String.format("Unrecognized operation type '%s' in getOperations: " +
+              "this should never happen!", operation));
+      }
+      PartialRow row = op.getRow();
+      for (ColumnSchema col : schema.getColumns()) {
+        try {
+          CoerceAndSet(m.group(col.getName()), col.getName(), col.getType(), row);
+        } catch (NumberFormatException e) {
+          String msg = String.format(
+              "Raw value '%s' couldn't be parsed to type %s for column '%s'",
+              raw, col.getType(), col.getName());
+          LogOrThrow(skipBadColumnValue, msg, e);
+        } catch (IllegalArgumentException e) {
+          String msg = String.format(
+              "Column '%s' has no matching group in '%s'",
+              col.getName(), raw);
+          LogOrThrow(skipMissingColumn, msg, e);
+        } catch (Exception e) {
+          throw new FlumeException("Failed to create Kudu operation", e);
+        }
+      }
+      ops.add(op);
+    }
+    if (!match && warnUnmatchedRows) {
+      logger.warn("Failed to match the pattern '{}' in '{}'", pattern, raw);
+    }
+    return ops;
+  }
+
+  /**
+   * Coerces the string `rawVal` to the type `type` and sets the resulting
+   * value for column `colName` in `row`.
+   *
+   * @param rawVal the raw string column value
+   * @param colName the name of the column
+   * @param type the Kudu type to convert `rawVal` to
+   * @param row the row to set the value in
+   * @throws NumberFormatException if `rawVal` cannot be cast as `type`.
+   */
+  private void CoerceAndSet(String rawVal, String colName, Type type, PartialRow row)
+      throws NumberFormatException {
+    switch (type) {
+      case INT8:
+        row.addByte(colName, Byte.parseByte(rawVal));
+        break;
+      case INT16:
+        row.addShort(colName, Short.parseShort(rawVal));
+        break;
+      case INT32:
+        row.addInt(colName, Integer.parseInt(rawVal));
+        break;
+      case INT64:
+        row.addLong(colName, Long.parseLong(rawVal));
+        break;
+      case BINARY:
+        row.addBinary(colName, rawVal.getBytes(charset));
+        break;
+      case STRING:
+        row.addString(colName, rawVal);
+        break;
+      case BOOL:
+        row.addBoolean(colName, Boolean.parseBoolean(rawVal));
+        break;
+      case FLOAT:
+        row.addFloat(colName, Float.parseFloat(rawVal));
+        break;
+      case DOUBLE:
+        row.addDouble(colName, Double.parseDouble(rawVal));
+        break;
+      case TIMESTAMP:
+        row.addLong(colName, Long.parseLong(rawVal));
+        break;
+      default:
+        logger.warn("got unknown type {} for column '{}'-- ignoring this column",
+            type, colName);
+    }
+  }
+
+  private void LogOrThrow(boolean log, String msg, Exception e)
+      throws FlumeException {
+    if (log) {
+      logger.warn(msg, e);
+    } else {
+      throw new FlumeException(msg, e);
+    }
+  }
+
+  @Override
+  public void close() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/30099cf9/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducerTest.java
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducerTest.java b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducerTest.java
new file mode 100644
index 0000000..d7734b6
--- /dev/null
+++ b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducerTest.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kudu.flume.sink;
+
+import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.MASTER_ADDRESSES;
+import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.PRODUCER;
+import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.PRODUCER_PREFIX;
+import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.TABLE_NAME;
+import static org.apache.kudu.flume.sink.RegexpKuduOperationsProducer.OPERATION_PROP;
+import static org.apache.kudu.flume.sink.RegexpKuduOperationsProducer.PATTERN_PROP;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.Sink;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.event.EventBuilder;
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.BaseKuduTest;
+import org.apache.kudu.client.CreateTableOptions;
+import org.apache.kudu.client.KuduTable;
+import org.junit.Test;
+
+public class RegexpKuduOperationsProducerTest extends BaseKuduTest {
+  private static final String TEST_REGEXP =
+      "(?<key>\\d+),(?<byteFld>\\d+),(?<shortFld>\\d+),(?<intFld>\\d+)," +
+      "(?<longFld>\\d+),(?<binaryFld>\\w+),(?<stringFld>\\w+),(?<boolFld>\\w+)," +
+      "(?<floatFld>\\d+\\.\\d*),(?<doubleFld>\\d+.\\d*)";
+
+  private KuduTable createNewTable(String tableName) throws Exception {
+    ArrayList<ColumnSchema> columns = new ArrayList<>(10);
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.INT32).key(true).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("byteFld", Type.INT8).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("shortFld", Type.INT16).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("intFld", Type.INT32).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("longFld", Type.INT64).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("binaryFld", Type.BINARY).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("stringFld", Type.STRING).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("boolFld", Type.BOOL).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("floatFld", Type.FLOAT).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("doubleFld", Type.DOUBLE).build());
+    CreateTableOptions createOptions =
+        new CreateTableOptions().addHashPartitions(ImmutableList.of("key"), 3).setNumReplicas(1);
+    KuduTable table = createTable(tableName, new Schema(columns), createOptions);
+    return table;
+  }
+
+  @Test
+  public void testEmptyChannel() throws Exception {
+    testEvents(0, 1, "insert");
+  }
+
+  @Test
+  public void testOneEvent() throws Exception {
+    testEvents(1, 1, "insert");
+  }
+
+  @Test
+  public void testThreeEvents() throws Exception {
+    testEvents(3, 1, "insert");
+  }
+
+  @Test
+  public void testThreeEventsWithUpsert() throws Exception {
+    testEvents(3, 1, "upsert");
+  }
+
+  @Test
+  public void testOneEventTwoRowsEach() throws Exception {
+    testEvents(1, 2, "insert");
+  }
+
+  @Test
+  public void testTwoEventsTwoRowsEach() throws Exception {
+    testEvents(2, 2, "insert");
+  }
+
+  @Test
+  public void testTwoEventsTwoRowsEachWithUpsert() throws Exception {
+    testEvents(2, 2, "upsert");
+  }
+
+  private void testEvents(int eventCount, int perEventRowCount, String operation) throws Exception {
+    String tableName = String.format("test%sevents%srowseach%s",
+        eventCount, perEventRowCount, operation);
+    KuduTable table = createNewTable(tableName);
+    KuduSink sink = createSink(tableName, operation);
+
+    Channel channel = new MemoryChannel();
+    Configurables.configure(channel, new Context());
+    sink.setChannel(channel);
+    sink.start();
+
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+
+    for (int i = 0; i < eventCount; i++) {
+      StringBuilder payload = new StringBuilder();
+      for (int j = 0; j < perEventRowCount; j++) {
+        String baseRow = "|1%1$d%2$d1,%1$d,%1$d,%1$d,%1$d,binary," +
+            "string,false,%1$d.%1$d,%1$d.%1$d,%1$d|";
+        String row = String.format(baseRow, i, j);
+        payload.append(row);
+      }
+      Event e = EventBuilder.withBody(payload.toString().getBytes());
+      channel.put(e);
+    }
+
+    if (eventCount > 0) {
+      // In the upsert case, add one upsert row per insert event (i.e. per i)
+      // All such rows go in one event.
+      if (operation.equals("upsert")) {
+        StringBuilder upserts = new StringBuilder();
+        for (int j = 0; j < perEventRowCount; j++) {
+          String row = String.format("|1%2$d%3$d1,%1$d,%1$d,%1$d,%1$d,binary," +
+              "string,false,%1$d.%1$d,%1$d.%1$d,%1$d|", 1, 0, j);
+          upserts.append(row);
+        }
+        Event e = EventBuilder.withBody(upserts.toString().getBytes());
+        channel.put(e);
+      }
+
+      // Also check some bad/corner cases.
+      String mismatchInInt = "|1,2,taco,4,5,x,y,true,1.0.2.0,999|";
+      String emptyString = "";
+      String[] testCases = {mismatchInInt, emptyString};
+      for (String testCase : testCases) {
+        Event e = EventBuilder.withBody(testCase.getBytes());
+        channel.put(e);
+      }
+    }
+
+    tx.commit();
+    tx.close();
+
+    Sink.Status status = sink.process();
+    if (eventCount == 0) {
+      assertTrue("incorrect status for empty channel", status == Sink.Status.BACKOFF);
+    } else {
+      assertTrue("incorrect status for non-empty channel", status != Sink.Status.BACKOFF);
+    }
+
+    List<String> rows = scanTableToStrings(table);
+    assertEquals(eventCount * perEventRowCount + " row(s) expected",
+      eventCount * perEventRowCount,
+      rows.size());
+
+    ArrayList<String> rightAnswers = new ArrayList<>(eventCount * perEventRowCount);
+    for (int i = 0; i < eventCount; i++) {
+      for (int j = 0; j < perEventRowCount; j++) {
+        int value = operation.equals("upsert") && i == 0 ? 1 : i;
+        String baseAnswer = "INT32 key=1%2$d%3$d1, INT8 byteFld=%1$d, INT16 shortFld=%1$d, " +
+            "INT32 intFld=%1$d, INT64 longFld=%1$d, BINARY binaryFld=\"binary\", " +
+            "STRING stringFld=string, BOOL boolFld=false, FLOAT floatFld=%1$d.%1$d, " +
+            "DOUBLE doubleFld=%1$d.%1$d";
+        String rightAnswer = String.format(baseAnswer, value, i, j);
+        rightAnswers.add(rightAnswer);
+      }
+    }
+    Collections.sort(rightAnswers);
+
+    for (int k = 0; k < eventCount * perEventRowCount; k++) {
+      assertEquals("incorrect row", rightAnswers.get(k), rows.get(k));
+    }
+  }
+
+  private KuduSink createSink(String tableName, String operation) {
+    return createSink(tableName, new Context(), operation);
+  }
+
+  private KuduSink createSink(String tableName, Context ctx, String operation) {
+    KuduSink sink = new KuduSink(syncClient);
+    HashMap<String, String> parameters = new HashMap<>();
+    parameters.put(TABLE_NAME, tableName);
+    parameters.put(MASTER_ADDRESSES, getMasterAddresses());
+    parameters.put(PRODUCER, RegexpKuduOperationsProducer.class.getName());
+    parameters.put(PRODUCER_PREFIX + PATTERN_PROP, TEST_REGEXP);
+    parameters.put(PRODUCER_PREFIX + OPERATION_PROP, operation);
+    Context context = new Context(parameters);
+    context.putAll(ctx.getParameters());
+    Configurables.configure(sink, context);
+    return sink;
+  }
+}