You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by jd...@apache.org on 2017/03/16 19:58:02 UTC

kudu git commit: ITBLL: clean up exception handling

Repository: kudu
Updated Branches:
  refs/heads/master 2e462afc8 -> 206a3f1f5


ITBLL: clean up exception handling

Change-Id: I8fcd624e709d0f3a931055c1b4b38aab2d5b2e37
Reviewed-on: http://gerrit.cloudera.org:8080/6407
Reviewed-by: Jean-Daniel Cryans <jd...@apache.org>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/206a3f1f
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/206a3f1f
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/206a3f1f

Branch: refs/heads/master
Commit: 206a3f1f567cca961bb71c3061883c0484e64d3e
Parents: 2e462af
Author: Dan Burkert <da...@apache.org>
Authored: Wed Mar 15 12:26:55 2017 -0700
Committer: Jean-Daniel Cryans <jd...@apache.org>
Committed: Thu Mar 16 19:57:10 2017 +0000

----------------------------------------------------------------------
 .../tools/IntegrationTestBigLinkedList.java     | 188 +++++++------------
 1 file changed, 67 insertions(+), 121 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/206a3f1f/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/IntegrationTestBigLinkedList.java
----------------------------------------------------------------------
diff --git a/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/IntegrationTestBigLinkedList.java b/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/IntegrationTestBigLinkedList.java
index 6171026..0eb1543 100644
--- a/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/IntegrationTestBigLinkedList.java
+++ b/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/IntegrationTestBigLinkedList.java
@@ -24,7 +24,6 @@ import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 import java.util.UUID;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
@@ -34,8 +33,6 @@ import org.apache.commons.cli.GnuParser;
 import org.apache.commons.cli.HelpFormatter;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileSystem;
@@ -63,6 +60,8 @@ import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.Schema;
@@ -73,6 +72,7 @@ import org.apache.kudu.client.AbstractKuduScannerBuilder;
 import org.apache.kudu.client.Bytes;
 import org.apache.kudu.client.CreateTableOptions;
 import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduException;
 import org.apache.kudu.client.KuduScanner;
 import org.apache.kudu.client.KuduSession;
 import org.apache.kudu.client.KuduTable;
@@ -89,7 +89,7 @@ import org.apache.kudu.util.Pair;
 /**
  * <p>
  * This is an integration test borrowed from goraci, written by Keith Turner,
- * which is in turn inspired by the Accumulo test called continous ingest (ci).
+ * which is in turn inspired by the Accumulo test called continuous ingest (ci).
  * The original source code can be found here:
  * </p>
  * <ul>
@@ -199,7 +199,7 @@ import org.apache.kudu.util.Pair;
  * Delete - Disabled. A standalone program that deletes a single node
  * </li>
  * <li>
- * Walker - Disabled. A standalong program that start following a linked list and emits timing
+ * Walker - Disabled. A standalone program that start following a linked list and emits timing
  * info.
  * </li>
  * </ul>
@@ -219,7 +219,7 @@ import org.apache.kudu.util.Pair;
  * spread over the Long.MIN_VALUE - Long.MAX_VALUE keyspace.
  * </li>
  * <li>
- * The Walker and Deleter progams were disabled to save some time but they can be re-enabled then
+ * The Walker and Deleter programs were disabled to save some time but they can be re-enabled then
  * ported to Kudu without too much effort.
  * </li>
  * </ul>
@@ -370,7 +370,7 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
    */
   static class Generator extends Configured implements Tool {
 
-    private static final Log LOG = LogFactory.getLog(Generator.class);
+    private static final Logger LOG = LoggerFactory.getLogger(Generator.class);
 
     private CommandLineParser parser;
     private KuduClient client;
@@ -435,7 +435,6 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
         public boolean nextKeyValue() throws IOException, InterruptedException {
           return count++ < numNodes;
         }
-
       }
 
       @Override
@@ -510,17 +509,13 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
       private int width;
 
       @Override
-      protected void setup(Context context) throws IOException, InterruptedException {
+      protected void setup(Context context) throws KuduException {
         id = "Job: " + context.getJobID() + " Task: " + context.getTaskAttemptID();
         Configuration conf = context.getConfiguration();
         CommandLineParser parser = new CommandLineParser(conf);
         client = parser.getClient();
-        try {
-          table = client.openTable(getTableName(conf));
-          headsTable = client.openTable(getHeadsTable(conf));
-        } catch (Exception e) {
-          throw new IOException(e);
-        }
+        table = client.openTable(getTableName(conf));
+        headsTable = client.openTable(getHeadsTable(conf));
         session = client.newSession();
         session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
         session.setMutationBufferSpace(WIDTH_DEFAULT);
@@ -538,14 +533,9 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
       }
 
       @Override
-      protected void cleanup(Context context) throws IOException, InterruptedException {
-        try {
+      protected void cleanup(Context context) throws KuduException {
           session.close();
           client.shutdown();
-        } catch (Exception ex) {
-          // ugh.
-          throw new IOException(ex);
-        }
       }
 
       @Override
@@ -578,12 +568,8 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
             PartialRow row = insert.getRow();
             row.addLong(COLUMN_KEY_ONE,  Bytes.getLong(first[0]));
             row.addLong(COLUMN_KEY_TWO, Bytes.getLong(first[0], 8));
-            try {
-              session.apply(insert);
-              session.flush();
-            } catch (Exception e) {
-              throw new IOException("Couldn't flush the head row, " + insert, e);
-            }
+            session.apply(insert);
+            session.flush();
 
             first = null;
             prev = null;
@@ -599,46 +585,41 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
         first[first.length - 1] = ez;
       }
 
-      private void persist(Context output, byte[][] data, boolean update)
-          throws IOException {
-        try {
-          for (int i = 0; i < data.length; i++) {
-            Operation put = update ? table.newUpdate() : table.newInsert();
-            PartialRow row = put.getRow();
-
-            long keyOne = Bytes.getLong(data[i]);
-            long keyTwo = Bytes.getLong(data[i], 8);
-
-            row.addLong(COLUMN_KEY_ONE, keyOne);
-            row.addLong(COLUMN_KEY_TWO, keyTwo);
-
-            // prev is null for the first line, we'll update it at the end.
-            if (prev == null) {
-              row.setNull(COLUMN_PREV_ONE);
-              row.setNull(COLUMN_PREV_TWO);
-            } else {
-              row.addLong(COLUMN_PREV_ONE, Bytes.getLong(prev[i]));
-              row.addLong(COLUMN_PREV_TWO, Bytes.getLong(prev[i], 8));
-            }
+      private void persist(Context output, byte[][] data, boolean update) throws KuduException {
+        for (int i = 0; i < data.length; i++) {
+          Operation put = update ? table.newUpdate() : table.newInsert();
+          PartialRow row = put.getRow();
 
-            if (!update) {
-              // We only add those for new inserts, we don't update the heads with a new row, etc.
-              row.addLong(COLUMN_ROW_ID, rowId + i);
-              row.addString(COLUMN_CLIENT, id);
-              row.addInt(COLUMN_UPDATE_COUNT, 0);
-            }
-            session.apply(put);
+          long keyOne = Bytes.getLong(data[i]);
+          long keyTwo = Bytes.getLong(data[i], 8);
 
-            if (i % 1000 == 0) {
-              // Tickle progress every so often else maprunner will think us hung
-              output.progress();
-            }
+          row.addLong(COLUMN_KEY_ONE, keyOne);
+          row.addLong(COLUMN_KEY_TWO, keyTwo);
+
+          // prev is null for the first line, we'll update it at the end.
+          if (prev == null) {
+            row.setNull(COLUMN_PREV_ONE);
+            row.setNull(COLUMN_PREV_TWO);
+          } else {
+            row.addLong(COLUMN_PREV_ONE, Bytes.getLong(prev[i]));
+            row.addLong(COLUMN_PREV_TWO, Bytes.getLong(prev[i], 8));
           }
 
-          session.flush();
-        } catch (Exception ex) {
-          throw new IOException(ex);
+          if (!update) {
+            // We only add those for new inserts, we don't update the heads with a new row, etc.
+            row.addLong(COLUMN_ROW_ID, rowId + i);
+            row.addString(COLUMN_CLIENT, id);
+            row.addInt(COLUMN_UPDATE_COUNT, 0);
+          }
+          session.apply(put);
+
+          if (i % 1000 == 0) {
+            // Tickle progress every so often else maprunner will think us hung
+            output.progress();
+          }
         }
+
+        session.flush();
       }
     }
 
@@ -702,9 +683,7 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
         BigInteger max = BigInteger.valueOf(Long.MAX_VALUE);
         BigInteger step = max.multiply(BigInteger.valueOf(2))
             .divide(BigInteger.valueOf(numTablets));
-        LOG.info(min.longValue());
-        LOG.info(max.longValue());
-        LOG.info(step.longValue());
+        LOG.info("min: {}, max: {}, step: {}", min, max, step);
         PartialRow splitRow = schema.newPartialRow();
         splitRow.addLong("key2", Long.MIN_VALUE);
         for (int i = 1; i < numTablets; i++) {
@@ -790,7 +769,7 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
    */
   static class Verify extends Configured implements Tool {
 
-    private static final Log LOG = LogFactory.getLog(Verify.class);
+    private static final Logger LOG = LoggerFactory.getLogger(Verify.class);
     private static final BytesWritable DEF = new BytesWritable(NO_KEY);
     private static final Joiner COMMA_JOINER = Joiner.on(",");
     private static final byte[] rowKey = new byte[ROWKEY_LENGTH];
@@ -831,8 +810,6 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
     public static class VerifyReducer extends Reducer<BytesWritable,BytesWritable,Text,Text> {
       private ArrayList<byte[]> refs = new ArrayList<byte[]>();
 
-      private AtomicInteger rows = new AtomicInteger(0);
-
       @Override
       public void reduce(BytesWritable key, Iterable<BytesWritable> values, Context context)
           throws IOException, InterruptedException {
@@ -1002,7 +979,7 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
    */
   static class Loop extends Configured implements Tool {
 
-    private static final Log LOG = LogFactory.getLog(Loop.class);
+    private static final Logger LOG = LoggerFactory.getLogger(Loop.class);
 
     IntegrationTestBigLinkedList it;
 
@@ -1190,7 +1167,7 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
    */
   private static class Updater extends Configured implements Tool {
 
-    private static final Log LOG = LogFactory.getLog(Updater.class);
+    private static final Logger LOG = LoggerFactory.getLogger(Updater.class);
 
     private static final String MAX_LINK_UPDATES_PER_MAPPER = "kudu.updates.per.mapper";
 
@@ -1218,10 +1195,10 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
        * Schema we use when getting rows from the linked list, we only need the reference and
        * its update count.
        */
-      private static final List<String> SCAN_COLUMN_NAMES = ImmutableList.of(
-          COLUMN_PREV_ONE, COLUMN_PREV_TWO, COLUMN_UPDATE_COUNT, COLUMN_CLIENT);
+      private static final List<String> SCAN_COLUMN_NAMES =
+          ImmutableList.of(COLUMN_PREV_ONE, COLUMN_PREV_TWO, COLUMN_UPDATE_COUNT, COLUMN_CLIENT);
 
-      private long numUpdatesPerMapper;
+      private int numUpdatesPerMapper;
 
       /**
        * Processing each linked list takes minutes, meaning that it's easily possible for our
@@ -1231,22 +1208,14 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
       private List<Pair<Long, Long>> headsCache;
 
       @Override
-      protected void setup(Context context) throws IOException, InterruptedException {
+      protected void setup(Context context) throws KuduException {
         Configuration conf = context.getConfiguration();
         CommandLineParser parser = new CommandLineParser(conf);
         client = parser.getClient();
-        try {
-          table = client.openTable(getTableName(conf));
-        } catch (Exception e) {
-          throw new IOException("Couldn't open the linked list table", e);
-        }
+        table = client.openTable(getTableName(conf));
         session = client.newSession();
-
-        Schema tableSchema = table.getSchema();
-
-
-        numUpdatesPerMapper = conf.getLong(MAX_LINK_UPDATES_PER_MAPPER, 1);
-        headsCache = new ArrayList<Pair<Long, Long>>((int)numUpdatesPerMapper);
+        numUpdatesPerMapper = conf.getInt(MAX_LINK_UPDATES_PER_MAPPER, 1);
+        headsCache = new ArrayList<>(numUpdatesPerMapper);
       }
 
       @Override
@@ -1340,19 +1309,11 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
       /**
        * Finds the next node in the linked list.
        */
-      private RowResult nextNode(long prevKeyOne, long prevKeyTwo) throws IOException {
+      private RowResult nextNode(long prevKeyOne, long prevKeyTwo) throws KuduException {
         KuduScanner.KuduScannerBuilder builder = client.newScannerBuilder(table)
-            .setProjectedColumnNames(SCAN_COLUMN_NAMES);
-
+                                                       .setProjectedColumnNames(SCAN_COLUMN_NAMES);
         configureScannerForRandomRead(builder, table, prevKeyOne, prevKeyTwo);
-
-        try {
-          return getOneRowResult(builder.build());
-        } catch (Exception e) {
-          // Goes right out and fails the job.
-          throw new IOException("Couldn't read the following row: " +
-              getStringFromKeys(prevKeyOne, prevKeyTwo), e);
-        }
+        return getOneRowResult(builder.build());
       }
 
       private void updateRow(long keyOne, long keyTwo, int newCount) throws IOException {
@@ -1361,13 +1322,7 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
         row.addLong(COLUMN_KEY_ONE, keyOne);
         row.addLong(COLUMN_KEY_TWO, keyTwo);
         row.addInt(COLUMN_UPDATE_COUNT, newCount);
-        try {
-          session.apply(update);
-        } catch (Exception e) {
-          // Goes right out and fails the job.
-          throw new IOException("Couldn't update the following row: " +
-              getStringFromKeys(keyOne, keyTwo), e);
-        }
+        session.apply(update);
       }
 
       /**
@@ -1399,14 +1354,9 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
       }
 
       @Override
-      protected void cleanup(Context context) throws IOException, InterruptedException {
-        try {
-          session.close();
-          client.shutdown();
-        } catch (Exception ex) {
-          // Goes right out and fails the job.
-          throw new IOException("Coulnd't close the scanner after the task completed", ex);
-        }
+      protected void cleanup(Context context) throws KuduException {
+        session.close();
+        client.shutdown();
       }
     }
 
@@ -1554,15 +1504,11 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
 
       System.out.println("Walking with " + getStringFromKeys(keyOne, keyTwo));
 
-      try {
-        walk(keyOne, keyTwo, maxNumNodes);
-      } catch (Exception e) {
-        throw new IOException(e);
-      }
+      walk(keyOne, keyTwo, maxNumNodes);
       return 0;
     }
 
-    private void walk(long headKeyOne, long headKeyTwo, int maxNumNodes) throws Exception {
+    private void walk(long headKeyOne, long headKeyTwo, int maxNumNodes) throws KuduException {
       CommandLineParser parser = new CommandLineParser(getConf());
       client = parser.getClient();
       table = client.openTable(getTableName(getConf()));
@@ -1591,7 +1537,7 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
           maxNumNodes));
     }
 
-    private RowResult nextNode(long keyOne, long keyTwo) throws Exception {
+    private RowResult nextNode(long keyOne, long keyTwo) throws KuduException {
       KuduScanner.KuduScannerBuilder builder = client.newScannerBuilder(table);
       configureScannerForRandomRead(builder, table, keyOne, keyTwo);
 
@@ -1646,14 +1592,14 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
     return new StringBuilder().append(key1).append(",").append(key2).toString();
   }
 
-  private static RowResult getOneRowResult(KuduScanner scanner) throws Exception {
+  private static RowResult getOneRowResult(KuduScanner scanner) throws KuduException {
     RowResultIterator rowResults;
     rowResults = scanner.nextRows();
     if (rowResults.getNumRows() == 0) {
       return null;
     }
     if (rowResults.getNumRows() > 1) {
-      throw new Exception("Received too many rows from scanner " + scanner);
+      throw new RuntimeException("Received too many rows from scanner " + scanner);
     }
     return rowResults.next();
   }
@@ -1693,7 +1639,7 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
 
   @Override
   public int run(String[] args) throws Exception {
-    Tool tool = null;
+    Tool tool;
     processOptions(args);
     if (toRun.equals("Generator")) {
       tool = new Generator();