You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by mm...@apache.org on 2019/03/22 20:24:45 UTC

[accumulo] branch master updated: Validate job store on MR configure. Fixes #1039 (#1051)

This is an automated email from the ASF dual-hosted git repository.

mmiller pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/master by this push:
     new bbf74ab  Validate job store on MR configure. Fixes #1039 (#1051)
bbf74ab is described below

commit bbf74abb550fec38e882ba0e4cd29e12e97ac8f0
Author: Mike Miller <mm...@apache.org>
AuthorDate: Fri Mar 22 16:24:41 2019 -0400

    Validate job store on MR configure. Fixes #1039 (#1051)
---
 .../hadoop/mapred/AccumuloOutputFormat.java        |  1 +
 .../hadoop/mapreduce/AccumuloOutputFormat.java     |  1 +
 .../hadoopImpl/mapred/AccumuloRecordReader.java    |  1 +
 .../hadoopImpl/mapreduce/AccumuloRecordReader.java |  1 +
 .../mapreduce/InputFormatBuilderImpl.java          |  1 +
 .../mapreduce/OutputFormatBuilderImpl.java         |  1 +
 .../hadoopImpl/mapreduce/lib/ConfiguratorBase.java | 22 +++++++++++++++++++++-
 .../hadoop/mapred/AccumuloInputFormatTest.java     | 14 ++++++++++++++
 .../hadoop/mapred/AccumuloOutputFormatTest.java    | 14 ++++++++++++++
 .../hadoop/mapreduce/AccumuloInputFormatTest.java  | 16 ++++++++++++++++
 .../hadoop/mapreduce/AccumuloOutputFormatTest.java | 14 ++++++++++++++
 11 files changed, 85 insertions(+), 1 deletion(-)

diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloOutputFormat.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloOutputFormat.java
index 7ec3748..32ea954 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloOutputFormat.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloOutputFormat.java
@@ -47,6 +47,7 @@ public class AccumuloOutputFormat implements OutputFormat<Text,Mutation> {
 
   @Override
   public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException {
+    OutputConfigurator.checkJobStored(CLASS, job);
     Properties clientProps = OutputConfigurator.getClientProperties(CLASS, job);
     AuthenticationToken token = ClientProperty.getAuthenticationToken(clientProps);
     try (AccumuloClient c = Accumulo.newClient().from(clientProps).build()) {
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloOutputFormat.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloOutputFormat.java
index 4e3d84a..54dd193 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloOutputFormat.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloOutputFormat.java
@@ -58,6 +58,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
 
   @Override
   public void checkOutputSpecs(JobContext job) throws IOException {
+    OutputConfigurator.checkJobStored(CLASS, job.getConfiguration());
     Properties clientProps = OutputConfigurator.getClientProperties(CLASS, job.getConfiguration());
     AuthenticationToken token = ClientProperty.getAuthenticationToken(clientProps);
     try (AccumuloClient c = Accumulo.newClient().from(clientProps).build()) {
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapred/AccumuloRecordReader.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapred/AccumuloRecordReader.java
index 0340649..9e90c54 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapred/AccumuloRecordReader.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapred/AccumuloRecordReader.java
@@ -436,6 +436,7 @@ public abstract class AccumuloRecordReader<K,V> implements RecordReader<K,V> {
    * {@link InputFormat}.
    */
   private static void validateOptions(JobConf job, Class<?> callingClass) throws IOException {
+    InputConfigurator.checkJobStored(callingClass, job);
     try (AccumuloClient client = InputConfigurator.createClient(callingClass, job)) {
       InputConfigurator.validatePermissions(callingClass, job, client);
     }
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/AccumuloRecordReader.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/AccumuloRecordReader.java
index c186e9e..b589314 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/AccumuloRecordReader.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/AccumuloRecordReader.java
@@ -300,6 +300,7 @@ public abstract class AccumuloRecordReader<K,V> extends RecordReader<K,V> {
    */
   private static void validateOptions(JobContext context, Class<?> callingClass)
       throws IOException {
+    InputConfigurator.checkJobStored(callingClass, context.getConfiguration());
     try (AccumuloClient client = InputConfigurator.createClient(callingClass,
         context.getConfiguration())) {
       InputConfigurator.validatePermissions(callingClass, context.getConfiguration(), client);
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/InputFormatBuilderImpl.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/InputFormatBuilderImpl.java
index 5b3bac0..fbee537 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/InputFormatBuilderImpl.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/InputFormatBuilderImpl.java
@@ -228,6 +228,7 @@ public class InputFormatBuilderImpl<T>
     } else {
       InputConfigurator.setInputTableConfigs(callingClass, conf, tableConfigMap);
     }
+    InputConfigurator.setJobStored(callingClass, conf);
   }
 
   /**
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/OutputFormatBuilderImpl.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/OutputFormatBuilderImpl.java
index d7582ea..ce8883e 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/OutputFormatBuilderImpl.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/OutputFormatBuilderImpl.java
@@ -94,6 +94,7 @@ public class OutputFormatBuilderImpl<T>
       OutputConfigurator.setDefaultTableName(callingClass, conf, defaultTableName.get());
     OutputConfigurator.setCreateTables(callingClass, conf, createTables);
     OutputConfigurator.setSimulationMode(callingClass, conf, simulationMode);
+    OutputConfigurator.setJobStored(callingClass, conf);
   }
 
   private void store(JobConf jobConf) {
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/ConfiguratorBase.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/ConfiguratorBase.java
index 5155b76..fc2fbd0 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/ConfiguratorBase.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/ConfiguratorBase.java
@@ -40,7 +40,7 @@ import org.apache.hadoop.util.StringUtils;
 public class ConfiguratorBase {
 
   public enum ClientOpts {
-    CLIENT_PROPS, CLIENT_PROPS_FILE, IS_CONFIGURED
+    CLIENT_PROPS, CLIENT_PROPS_FILE, IS_CONFIGURED, STORE_JOB_CALLED
   }
 
   /**
@@ -192,4 +192,24 @@ public class ConfiguratorBase {
     return conf.getInt(enumToConfKey(GeneralOpts.VISIBILITY_CACHE_SIZE),
         Constants.DEFAULT_VISIBILITY_CACHE_SIZE);
   }
+
+  /**
+   * The store method was called.
+   *
+   * @since 2.0.0
+   */
+  public static void setJobStored(Class<?> implementingClass, Configuration conf) {
+    conf.setBoolean(enumToConfKey(implementingClass, ClientOpts.STORE_JOB_CALLED), true);
+  }
+
+  /**
+   * Checks if the job store method was called. If not throw exception.
+   *
+   * @since 2.0.0
+   */
+  public static void checkJobStored(Class<?> implementingClass, Configuration conf) {
+    if (!conf.getBoolean(enumToConfKey(implementingClass, ClientOpts.STORE_JOB_CALLED), false)) {
+      throw new IllegalStateException("Bad configuration: the store method was not called.");
+    }
+  }
 }
diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloInputFormatTest.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloInputFormatTest.java
index 65e1ddf..24e021d 100644
--- a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloInputFormatTest.java
+++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloInputFormatTest.java
@@ -17,6 +17,7 @@
 package org.apache.accumulo.hadoop.mapred;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
@@ -225,4 +226,17 @@ public class AccumuloInputFormatTest {
 
     assertEquals(cols, InputConfigurator.getFetchedColumns(AccumuloInputFormat.class, job));
   }
+
+  @Test
+  public void testJobStoreException() throws Exception {
+    // test exception thrown when not calling store
+    AccumuloInputFormat.configure().clientProperties(clientProperties).table("table")
+        .auths(Authorizations.EMPTY);
+    AccumuloInputFormat aif = new AccumuloInputFormat();
+
+    try {
+      aif.getSplits(job, 1);
+      fail("IllegalStateException should have been thrown for not calling store");
+    } catch (IllegalStateException e) {}
+  }
 }
diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloOutputFormatTest.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloOutputFormatTest.java
index dc69dd9..a46ade8 100644
--- a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloOutputFormatTest.java
+++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloOutputFormatTest.java
@@ -18,6 +18,7 @@ package org.apache.accumulo.hadoop.mapred;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.util.Properties;
@@ -77,4 +78,17 @@ public class AccumuloOutputFormatTest {
     myAOF.checkOutputSpecs(null, job);
   }
 
+  @Test
+  public void testJobStoreException() throws Exception {
+    JobConf job = new JobConf();
+
+    Properties cp = Accumulo.newClientProperties().to("test", "zk").as("blah", "blah").build();
+
+    AccumuloOutputFormat.configure().clientProperties(cp);
+    try {
+      new AccumuloOutputFormat().checkOutputSpecs(null, job);
+      fail("IllegalStateException should have been thrown.");
+    } catch (IllegalStateException e) {}
+  }
+
 }
diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloInputFormatTest.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloInputFormatTest.java
index ac22733..2fd58bf 100644
--- a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloInputFormatTest.java
+++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloInputFormatTest.java
@@ -17,6 +17,7 @@
 package org.apache.accumulo.hadoop.mapreduce;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
@@ -237,4 +238,19 @@ public class AccumuloInputFormatTest {
     assertEquals(cols,
         InputConfigurator.getFetchedColumns(AccumuloInputFormat.class, job.getConfiguration()));
   }
+
+  @Test
+  public void testJobStoreException() throws Exception {
+    Job job = Job.getInstance();
+
+    // test exception thrown when not calling store
+    AccumuloInputFormat.configure().clientProperties(clientProperties).table("table")
+        .auths(Authorizations.EMPTY);
+    AccumuloInputFormat aif = new AccumuloInputFormat();
+
+    try {
+      aif.getSplits(job);
+      fail("IllegalStateException should have been thrown for not calling store");
+    } catch (IllegalStateException e) {}
+  }
 }
diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloOutputFormatTest.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloOutputFormatTest.java
index 6351396..d16ee68 100644
--- a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloOutputFormatTest.java
+++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloOutputFormatTest.java
@@ -18,6 +18,7 @@ package org.apache.accumulo.hadoop.mapreduce;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.util.Properties;
@@ -79,4 +80,17 @@ public class AccumuloOutputFormatTest {
     myAOF.checkOutputSpecs(job);
   }
 
+  @Test
+  public void testJobStoreException() throws Exception {
+    Job job = Job.getInstance();
+
+    Properties cp = Accumulo.newClientProperties().to("test", "zk").as("blah", "blah").build();
+
+    AccumuloOutputFormat.configure().clientProperties(cp);
+    try {
+      new AccumuloOutputFormat().checkOutputSpecs(job);
+      fail("IllegalStateException should have been thrown.");
+    } catch (IllegalStateException e) {}
+  }
+
 }