You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by jd...@apache.org on 2017/03/16 19:58:02 UTC
kudu git commit: ITBLL: clean up exception handling
Repository: kudu
Updated Branches:
refs/heads/master 2e462afc8 -> 206a3f1f5
ITBLL: clean up exception handling
Change-Id: I8fcd624e709d0f3a931055c1b4b38aab2d5b2e37
Reviewed-on: http://gerrit.cloudera.org:8080/6407
Reviewed-by: Jean-Daniel Cryans <jd...@apache.org>
Tested-by: Kudu Jenkins
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/206a3f1f
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/206a3f1f
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/206a3f1f
Branch: refs/heads/master
Commit: 206a3f1f567cca961bb71c3061883c0484e64d3e
Parents: 2e462af
Author: Dan Burkert <da...@apache.org>
Authored: Wed Mar 15 12:26:55 2017 -0700
Committer: Jean-Daniel Cryans <jd...@apache.org>
Committed: Thu Mar 16 19:57:10 2017 +0000
----------------------------------------------------------------------
.../tools/IntegrationTestBigLinkedList.java | 188 +++++++------------
1 file changed, 67 insertions(+), 121 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/206a3f1f/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/IntegrationTestBigLinkedList.java
----------------------------------------------------------------------
diff --git a/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/IntegrationTestBigLinkedList.java b/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/IntegrationTestBigLinkedList.java
index 6171026..0eb1543 100644
--- a/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/IntegrationTestBigLinkedList.java
+++ b/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/IntegrationTestBigLinkedList.java
@@ -24,7 +24,6 @@ import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
-import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
@@ -34,8 +33,6 @@ import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
@@ -63,6 +60,8 @@ import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
@@ -73,6 +72,7 @@ import org.apache.kudu.client.AbstractKuduScannerBuilder;
import org.apache.kudu.client.Bytes;
import org.apache.kudu.client.CreateTableOptions;
import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduException;
import org.apache.kudu.client.KuduScanner;
import org.apache.kudu.client.KuduSession;
import org.apache.kudu.client.KuduTable;
@@ -89,7 +89,7 @@ import org.apache.kudu.util.Pair;
/**
* <p>
* This is an integration test borrowed from goraci, written by Keith Turner,
- * which is in turn inspired by the Accumulo test called continous ingest (ci).
+ * which is in turn inspired by the Accumulo test called continuous ingest (ci).
* The original source code can be found here:
* </p>
* <ul>
@@ -199,7 +199,7 @@ import org.apache.kudu.util.Pair;
* Delete - Disabled. A standalone program that deletes a single node
* </li>
* <li>
- * Walker - Disabled. A standalong program that start following a linked list and emits timing
+ * Walker - Disabled. A standalone program that start following a linked list and emits timing
* info.
* </li>
* </ul>
@@ -219,7 +219,7 @@ import org.apache.kudu.util.Pair;
* spread over the Long.MIN_VALUE - Long.MAX_VALUE keyspace.
* </li>
* <li>
- * The Walker and Deleter progams were disabled to save some time but they can be re-enabled then
+ * The Walker and Deleter programs were disabled to save some time but they can be re-enabled then
* ported to Kudu without too much effort.
* </li>
* </ul>
@@ -370,7 +370,7 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
*/
static class Generator extends Configured implements Tool {
- private static final Log LOG = LogFactory.getLog(Generator.class);
+ private static final Logger LOG = LoggerFactory.getLogger(Generator.class);
private CommandLineParser parser;
private KuduClient client;
@@ -435,7 +435,6 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
public boolean nextKeyValue() throws IOException, InterruptedException {
return count++ < numNodes;
}
-
}
@Override
@@ -510,17 +509,13 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
private int width;
@Override
- protected void setup(Context context) throws IOException, InterruptedException {
+ protected void setup(Context context) throws KuduException {
id = "Job: " + context.getJobID() + " Task: " + context.getTaskAttemptID();
Configuration conf = context.getConfiguration();
CommandLineParser parser = new CommandLineParser(conf);
client = parser.getClient();
- try {
- table = client.openTable(getTableName(conf));
- headsTable = client.openTable(getHeadsTable(conf));
- } catch (Exception e) {
- throw new IOException(e);
- }
+ table = client.openTable(getTableName(conf));
+ headsTable = client.openTable(getHeadsTable(conf));
session = client.newSession();
session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
session.setMutationBufferSpace(WIDTH_DEFAULT);
@@ -538,14 +533,9 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
}
@Override
- protected void cleanup(Context context) throws IOException, InterruptedException {
- try {
+ protected void cleanup(Context context) throws KuduException {
session.close();
client.shutdown();
- } catch (Exception ex) {
- // ugh.
- throw new IOException(ex);
- }
}
@Override
@@ -578,12 +568,8 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
PartialRow row = insert.getRow();
row.addLong(COLUMN_KEY_ONE, Bytes.getLong(first[0]));
row.addLong(COLUMN_KEY_TWO, Bytes.getLong(first[0], 8));
- try {
- session.apply(insert);
- session.flush();
- } catch (Exception e) {
- throw new IOException("Couldn't flush the head row, " + insert, e);
- }
+ session.apply(insert);
+ session.flush();
first = null;
prev = null;
@@ -599,46 +585,41 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
first[first.length - 1] = ez;
}
- private void persist(Context output, byte[][] data, boolean update)
- throws IOException {
- try {
- for (int i = 0; i < data.length; i++) {
- Operation put = update ? table.newUpdate() : table.newInsert();
- PartialRow row = put.getRow();
-
- long keyOne = Bytes.getLong(data[i]);
- long keyTwo = Bytes.getLong(data[i], 8);
-
- row.addLong(COLUMN_KEY_ONE, keyOne);
- row.addLong(COLUMN_KEY_TWO, keyTwo);
-
- // prev is null for the first line, we'll update it at the end.
- if (prev == null) {
- row.setNull(COLUMN_PREV_ONE);
- row.setNull(COLUMN_PREV_TWO);
- } else {
- row.addLong(COLUMN_PREV_ONE, Bytes.getLong(prev[i]));
- row.addLong(COLUMN_PREV_TWO, Bytes.getLong(prev[i], 8));
- }
+ private void persist(Context output, byte[][] data, boolean update) throws KuduException {
+ for (int i = 0; i < data.length; i++) {
+ Operation put = update ? table.newUpdate() : table.newInsert();
+ PartialRow row = put.getRow();
- if (!update) {
- // We only add those for new inserts, we don't update the heads with a new row, etc.
- row.addLong(COLUMN_ROW_ID, rowId + i);
- row.addString(COLUMN_CLIENT, id);
- row.addInt(COLUMN_UPDATE_COUNT, 0);
- }
- session.apply(put);
+ long keyOne = Bytes.getLong(data[i]);
+ long keyTwo = Bytes.getLong(data[i], 8);
- if (i % 1000 == 0) {
- // Tickle progress every so often else maprunner will think us hung
- output.progress();
- }
+ row.addLong(COLUMN_KEY_ONE, keyOne);
+ row.addLong(COLUMN_KEY_TWO, keyTwo);
+
+ // prev is null for the first line, we'll update it at the end.
+ if (prev == null) {
+ row.setNull(COLUMN_PREV_ONE);
+ row.setNull(COLUMN_PREV_TWO);
+ } else {
+ row.addLong(COLUMN_PREV_ONE, Bytes.getLong(prev[i]));
+ row.addLong(COLUMN_PREV_TWO, Bytes.getLong(prev[i], 8));
}
- session.flush();
- } catch (Exception ex) {
- throw new IOException(ex);
+ if (!update) {
+ // We only add those for new inserts, we don't update the heads with a new row, etc.
+ row.addLong(COLUMN_ROW_ID, rowId + i);
+ row.addString(COLUMN_CLIENT, id);
+ row.addInt(COLUMN_UPDATE_COUNT, 0);
+ }
+ session.apply(put);
+
+ if (i % 1000 == 0) {
+ // Tickle progress every so often else maprunner will think us hung
+ output.progress();
+ }
}
+
+ session.flush();
}
}
@@ -702,9 +683,7 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
BigInteger max = BigInteger.valueOf(Long.MAX_VALUE);
BigInteger step = max.multiply(BigInteger.valueOf(2))
.divide(BigInteger.valueOf(numTablets));
- LOG.info(min.longValue());
- LOG.info(max.longValue());
- LOG.info(step.longValue());
+ LOG.info("min: {}, max: {}, step: {}", min, max, step);
PartialRow splitRow = schema.newPartialRow();
splitRow.addLong("key2", Long.MIN_VALUE);
for (int i = 1; i < numTablets; i++) {
@@ -790,7 +769,7 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
*/
static class Verify extends Configured implements Tool {
- private static final Log LOG = LogFactory.getLog(Verify.class);
+ private static final Logger LOG = LoggerFactory.getLogger(Verify.class);
private static final BytesWritable DEF = new BytesWritable(NO_KEY);
private static final Joiner COMMA_JOINER = Joiner.on(",");
private static final byte[] rowKey = new byte[ROWKEY_LENGTH];
@@ -831,8 +810,6 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
public static class VerifyReducer extends Reducer<BytesWritable,BytesWritable,Text,Text> {
private ArrayList<byte[]> refs = new ArrayList<byte[]>();
- private AtomicInteger rows = new AtomicInteger(0);
-
@Override
public void reduce(BytesWritable key, Iterable<BytesWritable> values, Context context)
throws IOException, InterruptedException {
@@ -1002,7 +979,7 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
*/
static class Loop extends Configured implements Tool {
- private static final Log LOG = LogFactory.getLog(Loop.class);
+ private static final Logger LOG = LoggerFactory.getLogger(Loop.class);
IntegrationTestBigLinkedList it;
@@ -1190,7 +1167,7 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
*/
private static class Updater extends Configured implements Tool {
- private static final Log LOG = LogFactory.getLog(Updater.class);
+ private static final Logger LOG = LoggerFactory.getLogger(Updater.class);
private static final String MAX_LINK_UPDATES_PER_MAPPER = "kudu.updates.per.mapper";
@@ -1218,10 +1195,10 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
* Schema we use when getting rows from the linked list, we only need the reference and
* its update count.
*/
- private static final List<String> SCAN_COLUMN_NAMES = ImmutableList.of(
- COLUMN_PREV_ONE, COLUMN_PREV_TWO, COLUMN_UPDATE_COUNT, COLUMN_CLIENT);
+ private static final List<String> SCAN_COLUMN_NAMES =
+ ImmutableList.of(COLUMN_PREV_ONE, COLUMN_PREV_TWO, COLUMN_UPDATE_COUNT, COLUMN_CLIENT);
- private long numUpdatesPerMapper;
+ private int numUpdatesPerMapper;
/**
* Processing each linked list takes minutes, meaning that it's easily possible for our
@@ -1231,22 +1208,14 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
private List<Pair<Long, Long>> headsCache;
@Override
- protected void setup(Context context) throws IOException, InterruptedException {
+ protected void setup(Context context) throws KuduException {
Configuration conf = context.getConfiguration();
CommandLineParser parser = new CommandLineParser(conf);
client = parser.getClient();
- try {
- table = client.openTable(getTableName(conf));
- } catch (Exception e) {
- throw new IOException("Couldn't open the linked list table", e);
- }
+ table = client.openTable(getTableName(conf));
session = client.newSession();
-
- Schema tableSchema = table.getSchema();
-
-
- numUpdatesPerMapper = conf.getLong(MAX_LINK_UPDATES_PER_MAPPER, 1);
- headsCache = new ArrayList<Pair<Long, Long>>((int)numUpdatesPerMapper);
+ numUpdatesPerMapper = conf.getInt(MAX_LINK_UPDATES_PER_MAPPER, 1);
+ headsCache = new ArrayList<>(numUpdatesPerMapper);
}
@Override
@@ -1340,19 +1309,11 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
/**
* Finds the next node in the linked list.
*/
- private RowResult nextNode(long prevKeyOne, long prevKeyTwo) throws IOException {
+ private RowResult nextNode(long prevKeyOne, long prevKeyTwo) throws KuduException {
KuduScanner.KuduScannerBuilder builder = client.newScannerBuilder(table)
- .setProjectedColumnNames(SCAN_COLUMN_NAMES);
-
+ .setProjectedColumnNames(SCAN_COLUMN_NAMES);
configureScannerForRandomRead(builder, table, prevKeyOne, prevKeyTwo);
-
- try {
- return getOneRowResult(builder.build());
- } catch (Exception e) {
- // Goes right out and fails the job.
- throw new IOException("Couldn't read the following row: " +
- getStringFromKeys(prevKeyOne, prevKeyTwo), e);
- }
+ return getOneRowResult(builder.build());
}
private void updateRow(long keyOne, long keyTwo, int newCount) throws IOException {
@@ -1361,13 +1322,7 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
row.addLong(COLUMN_KEY_ONE, keyOne);
row.addLong(COLUMN_KEY_TWO, keyTwo);
row.addInt(COLUMN_UPDATE_COUNT, newCount);
- try {
- session.apply(update);
- } catch (Exception e) {
- // Goes right out and fails the job.
- throw new IOException("Couldn't update the following row: " +
- getStringFromKeys(keyOne, keyTwo), e);
- }
+ session.apply(update);
}
/**
@@ -1399,14 +1354,9 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
}
@Override
- protected void cleanup(Context context) throws IOException, InterruptedException {
- try {
- session.close();
- client.shutdown();
- } catch (Exception ex) {
- // Goes right out and fails the job.
- throw new IOException("Coulnd't close the scanner after the task completed", ex);
- }
+ protected void cleanup(Context context) throws KuduException {
+ session.close();
+ client.shutdown();
}
}
@@ -1554,15 +1504,11 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
System.out.println("Walking with " + getStringFromKeys(keyOne, keyTwo));
- try {
- walk(keyOne, keyTwo, maxNumNodes);
- } catch (Exception e) {
- throw new IOException(e);
- }
+ walk(keyOne, keyTwo, maxNumNodes);
return 0;
}
- private void walk(long headKeyOne, long headKeyTwo, int maxNumNodes) throws Exception {
+ private void walk(long headKeyOne, long headKeyTwo, int maxNumNodes) throws KuduException {
CommandLineParser parser = new CommandLineParser(getConf());
client = parser.getClient();
table = client.openTable(getTableName(getConf()));
@@ -1591,7 +1537,7 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
maxNumNodes));
}
- private RowResult nextNode(long keyOne, long keyTwo) throws Exception {
+ private RowResult nextNode(long keyOne, long keyTwo) throws KuduException {
KuduScanner.KuduScannerBuilder builder = client.newScannerBuilder(table);
configureScannerForRandomRead(builder, table, keyOne, keyTwo);
@@ -1646,14 +1592,14 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
return new StringBuilder().append(key1).append(",").append(key2).toString();
}
- private static RowResult getOneRowResult(KuduScanner scanner) throws Exception {
+ private static RowResult getOneRowResult(KuduScanner scanner) throws KuduException {
RowResultIterator rowResults;
rowResults = scanner.nextRows();
if (rowResults.getNumRows() == 0) {
return null;
}
if (rowResults.getNumRows() > 1) {
- throw new Exception("Received too many rows from scanner " + scanner);
+ throw new RuntimeException("Received too many rows from scanner " + scanner);
}
return rowResults.next();
}
@@ -1693,7 +1639,7 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
- Tool tool = null;
+ Tool tool;
processOptions(args);
if (toRun.equals("Generator")) {
tool = new Generator();