You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by mm...@apache.org on 2019/03/22 20:24:45 UTC
[accumulo] branch master updated: Validate job store on MR
configure. Fixes #1039 (#1051)
This is an automated email from the ASF dual-hosted git repository.
mmiller pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/master by this push:
new bbf74ab Validate job store on MR configure. Fixes #1039 (#1051)
bbf74ab is described below
commit bbf74abb550fec38e882ba0e4cd29e12e97ac8f0
Author: Mike Miller <mm...@apache.org>
AuthorDate: Fri Mar 22 16:24:41 2019 -0400
Validate job store on MR configure. Fixes #1039 (#1051)
---
.../hadoop/mapred/AccumuloOutputFormat.java | 1 +
.../hadoop/mapreduce/AccumuloOutputFormat.java | 1 +
.../hadoopImpl/mapred/AccumuloRecordReader.java | 1 +
.../hadoopImpl/mapreduce/AccumuloRecordReader.java | 1 +
.../mapreduce/InputFormatBuilderImpl.java | 1 +
.../mapreduce/OutputFormatBuilderImpl.java | 1 +
.../hadoopImpl/mapreduce/lib/ConfiguratorBase.java | 22 +++++++++++++++++++++-
.../hadoop/mapred/AccumuloInputFormatTest.java | 14 ++++++++++++++
.../hadoop/mapred/AccumuloOutputFormatTest.java | 14 ++++++++++++++
.../hadoop/mapreduce/AccumuloInputFormatTest.java | 16 ++++++++++++++++
.../hadoop/mapreduce/AccumuloOutputFormatTest.java | 14 ++++++++++++++
11 files changed, 85 insertions(+), 1 deletion(-)
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloOutputFormat.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloOutputFormat.java
index 7ec3748..32ea954 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloOutputFormat.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloOutputFormat.java
@@ -47,6 +47,7 @@ public class AccumuloOutputFormat implements OutputFormat<Text,Mutation> {
@Override
public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException {
+ OutputConfigurator.checkJobStored(CLASS, job);
Properties clientProps = OutputConfigurator.getClientProperties(CLASS, job);
AuthenticationToken token = ClientProperty.getAuthenticationToken(clientProps);
try (AccumuloClient c = Accumulo.newClient().from(clientProps).build()) {
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloOutputFormat.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloOutputFormat.java
index 4e3d84a..54dd193 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloOutputFormat.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloOutputFormat.java
@@ -58,6 +58,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
@Override
public void checkOutputSpecs(JobContext job) throws IOException {
+ OutputConfigurator.checkJobStored(CLASS, job.getConfiguration());
Properties clientProps = OutputConfigurator.getClientProperties(CLASS, job.getConfiguration());
AuthenticationToken token = ClientProperty.getAuthenticationToken(clientProps);
try (AccumuloClient c = Accumulo.newClient().from(clientProps).build()) {
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapred/AccumuloRecordReader.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapred/AccumuloRecordReader.java
index 0340649..9e90c54 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapred/AccumuloRecordReader.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapred/AccumuloRecordReader.java
@@ -436,6 +436,7 @@ public abstract class AccumuloRecordReader<K,V> implements RecordReader<K,V> {
* {@link InputFormat}.
*/
private static void validateOptions(JobConf job, Class<?> callingClass) throws IOException {
+ InputConfigurator.checkJobStored(callingClass, job);
try (AccumuloClient client = InputConfigurator.createClient(callingClass, job)) {
InputConfigurator.validatePermissions(callingClass, job, client);
}
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/AccumuloRecordReader.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/AccumuloRecordReader.java
index c186e9e..b589314 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/AccumuloRecordReader.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/AccumuloRecordReader.java
@@ -300,6 +300,7 @@ public abstract class AccumuloRecordReader<K,V> extends RecordReader<K,V> {
*/
private static void validateOptions(JobContext context, Class<?> callingClass)
throws IOException {
+ InputConfigurator.checkJobStored(callingClass, context.getConfiguration());
try (AccumuloClient client = InputConfigurator.createClient(callingClass,
context.getConfiguration())) {
InputConfigurator.validatePermissions(callingClass, context.getConfiguration(), client);
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/InputFormatBuilderImpl.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/InputFormatBuilderImpl.java
index 5b3bac0..fbee537 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/InputFormatBuilderImpl.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/InputFormatBuilderImpl.java
@@ -228,6 +228,7 @@ public class InputFormatBuilderImpl<T>
} else {
InputConfigurator.setInputTableConfigs(callingClass, conf, tableConfigMap);
}
+ InputConfigurator.setJobStored(callingClass, conf);
}
/**
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/OutputFormatBuilderImpl.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/OutputFormatBuilderImpl.java
index d7582ea..ce8883e 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/OutputFormatBuilderImpl.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/OutputFormatBuilderImpl.java
@@ -94,6 +94,7 @@ public class OutputFormatBuilderImpl<T>
OutputConfigurator.setDefaultTableName(callingClass, conf, defaultTableName.get());
OutputConfigurator.setCreateTables(callingClass, conf, createTables);
OutputConfigurator.setSimulationMode(callingClass, conf, simulationMode);
+ OutputConfigurator.setJobStored(callingClass, conf);
}
private void store(JobConf jobConf) {
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/ConfiguratorBase.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/ConfiguratorBase.java
index 5155b76..fc2fbd0 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/ConfiguratorBase.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/ConfiguratorBase.java
@@ -40,7 +40,7 @@ import org.apache.hadoop.util.StringUtils;
public class ConfiguratorBase {
public enum ClientOpts {
- CLIENT_PROPS, CLIENT_PROPS_FILE, IS_CONFIGURED
+ CLIENT_PROPS, CLIENT_PROPS_FILE, IS_CONFIGURED, STORE_JOB_CALLED
}
/**
@@ -192,4 +192,24 @@ public class ConfiguratorBase {
return conf.getInt(enumToConfKey(GeneralOpts.VISIBILITY_CACHE_SIZE),
Constants.DEFAULT_VISIBILITY_CACHE_SIZE);
}
+
+ /**
+ * The store method was called.
+ *
+ * @since 2.0.0
+ */
+ public static void setJobStored(Class<?> implementingClass, Configuration conf) {
+ conf.setBoolean(enumToConfKey(implementingClass, ClientOpts.STORE_JOB_CALLED), true);
+ }
+
+ /**
+ * Checks if the job store method was called. If not throw exception.
+ *
+ * @since 2.0.0
+ */
+ public static void checkJobStored(Class<?> implementingClass, Configuration conf) {
+ if (!conf.getBoolean(enumToConfKey(implementingClass, ClientOpts.STORE_JOB_CALLED), false)) {
+ throw new IllegalStateException("Bad configuration: the store method was not called.");
+ }
+ }
}
diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloInputFormatTest.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloInputFormatTest.java
index 65e1ddf..24e021d 100644
--- a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloInputFormatTest.java
+++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloInputFormatTest.java
@@ -17,6 +17,7 @@
package org.apache.accumulo.hadoop.mapred;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
@@ -225,4 +226,17 @@ public class AccumuloInputFormatTest {
assertEquals(cols, InputConfigurator.getFetchedColumns(AccumuloInputFormat.class, job));
}
+
+ @Test
+ public void testJobStoreException() throws Exception {
+ // test exception thrown when not calling store
+ AccumuloInputFormat.configure().clientProperties(clientProperties).table("table")
+ .auths(Authorizations.EMPTY);
+ AccumuloInputFormat aif = new AccumuloInputFormat();
+
+ try {
+ aif.getSplits(job, 1);
+ fail("IllegalStateException should have been thrown for not calling store");
+ } catch (IllegalStateException e) {}
+ }
}
diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloOutputFormatTest.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloOutputFormatTest.java
index dc69dd9..a46ade8 100644
--- a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloOutputFormatTest.java
+++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloOutputFormatTest.java
@@ -18,6 +18,7 @@ package org.apache.accumulo.hadoop.mapred;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.Properties;
@@ -77,4 +78,17 @@ public class AccumuloOutputFormatTest {
myAOF.checkOutputSpecs(null, job);
}
+ @Test
+ public void testJobStoreException() throws Exception {
+ JobConf job = new JobConf();
+
+ Properties cp = Accumulo.newClientProperties().to("test", "zk").as("blah", "blah").build();
+
+ AccumuloOutputFormat.configure().clientProperties(cp);
+ try {
+ new AccumuloOutputFormat().checkOutputSpecs(null, job);
+ fail("IllegalStateException should have been thrown.");
+ } catch (IllegalStateException e) {}
+ }
+
}
diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloInputFormatTest.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloInputFormatTest.java
index ac22733..2fd58bf 100644
--- a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloInputFormatTest.java
+++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloInputFormatTest.java
@@ -17,6 +17,7 @@
package org.apache.accumulo.hadoop.mapreduce;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
@@ -237,4 +238,19 @@ public class AccumuloInputFormatTest {
assertEquals(cols,
InputConfigurator.getFetchedColumns(AccumuloInputFormat.class, job.getConfiguration()));
}
+
+ @Test
+ public void testJobStoreException() throws Exception {
+ Job job = Job.getInstance();
+
+ // test exception thrown when not calling store
+ AccumuloInputFormat.configure().clientProperties(clientProperties).table("table")
+ .auths(Authorizations.EMPTY);
+ AccumuloInputFormat aif = new AccumuloInputFormat();
+
+ try {
+ aif.getSplits(job);
+ fail("IllegalStateException should have been thrown for not calling store");
+ } catch (IllegalStateException e) {}
+ }
}
diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloOutputFormatTest.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloOutputFormatTest.java
index 6351396..d16ee68 100644
--- a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloOutputFormatTest.java
+++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloOutputFormatTest.java
@@ -18,6 +18,7 @@ package org.apache.accumulo.hadoop.mapreduce;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.Properties;
@@ -79,4 +80,17 @@ public class AccumuloOutputFormatTest {
myAOF.checkOutputSpecs(job);
}
+ @Test
+ public void testJobStoreException() throws Exception {
+ Job job = Job.getInstance();
+
+ Properties cp = Accumulo.newClientProperties().to("test", "zk").as("blah", "blah").build();
+
+ AccumuloOutputFormat.configure().clientProperties(cp);
+ try {
+ new AccumuloOutputFormat().checkOutputSpecs(job);
+ fail("IllegalStateException should have been thrown.");
+ } catch (IllegalStateException e) {}
+ }
+
}