You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@fluo.apache.org by kt...@apache.org on 2020/07/16 15:55:52 UTC
[fluo-recipes] branch master updated: Update to work with Accumulo
2.0.0 and Fluo 2.0.0 (#150)
This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/fluo-recipes.git
The following commit(s) were added to refs/heads/master by this push:
new cfc86ba Update to work with Accumulo 2.0.0 and Fluo 2.0.0 (#150)
cfc86ba is described below
commit cfc86ba4c1faf20da78f8a0fc149c737aca407c7
Author: Keith Turner <kt...@apache.org>
AuthorDate: Thu Jul 16 11:55:44 2020 -0400
Update to work with Accumulo 2.0.0 and Fluo 2.0.0 (#150)
---
.travis.yml | 1 -
modules/accumulo/pom.xml | 4 -
.../accumulo/export/function/AccumuloWriter.java | 21 +++--
.../fluo/recipes/accumulo/ops/TableOperations.java | 94 +++++++++++-----------
.../accumulo/src/main/spotbugs/exclude-filter.xml | 23 ++++++
modules/core/src/main/spotbugs/exclude-filter.xml | 23 ++++++
.../recipes/core/export/it/ExportTestBase.java | 4 +-
modules/spark/pom.xml | 15 +---
.../apache/fluo/recipes/spark/FluoSparkHelper.java | 75 ++++++++++-------
modules/spark/src/main/spotbugs/exclude-filter.xml | 23 ++++++
.../org/apache/fluo/recipes/test/FluoITHelper.java | 48 ++++++++++-
modules/test/src/main/spotbugs/exclude-filter.xml | 23 ++++++
pom.xml | 85 ++++++++++++++-----
13 files changed, 306 insertions(+), 133 deletions(-)
diff --git a/.travis.yml b/.travis.yml
index a9745e7..f8b4173 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -29,6 +29,5 @@ before_script:
- unset _JAVA_OPTIONS
env:
- ADDITIONAL_MAVEN_OPTS=
- - ADDITIONAL_MAVEN_OPTS=-Daccumulo.version=1.9.2
script:
- mvn clean verify javadoc:jar $ADDITIONAL_MAVEN_OPTS
diff --git a/modules/accumulo/pom.xml b/modules/accumulo/pom.xml
index 0d4a5ea..569041f 100644
--- a/modules/accumulo/pom.xml
+++ b/modules/accumulo/pom.xml
@@ -25,10 +25,6 @@
<name>Apache Fluo Recipes for Apache Accumulo</name>
<dependencies>
<dependency>
- <groupId>commons-configuration</groupId>
- <artifactId>commons-configuration</artifactId>
- </dependency>
- <dependency>
<groupId>javax.inject</groupId>
<artifactId>javax.inject</artifactId>
<version>1</version>
diff --git a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/function/AccumuloWriter.java b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/function/AccumuloWriter.java
index 23e9991..95fe81e 100644
--- a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/function/AccumuloWriter.java
+++ b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/function/AccumuloWriter.java
@@ -4,9 +4,9 @@
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
@@ -23,17 +23,15 @@ import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.ClientConfiguration;
-import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.TableExistsException;
import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.client.ZooKeeperInstance;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.data.Mutation;
/**
@@ -59,21 +57,20 @@ class AccumuloWriter {
ExportTask(String instanceName, String zookeepers, String user, String password, String table)
throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
- ZooKeeperInstance zki = new ZooKeeperInstance(
- new ClientConfiguration().withInstance(instanceName).withZkHosts(zookeepers));
+ AccumuloClient client =
+ Accumulo.newClient().to(instanceName, zookeepers).as(user, password).build();
// TODO need to close batch writer
- Connector conn = zki.getConnector(user, new PasswordToken(password));
try {
- bw = conn.createBatchWriter(table, new BatchWriterConfig());
+ bw = client.createBatchWriter(table, new BatchWriterConfig());
} catch (TableNotFoundException tnfe) {
try {
- conn.tableOperations().create(table);
+ client.tableOperations().create(table);
} catch (TableExistsException e) {
// nothing to do
}
- bw = conn.createBatchWriter(table, new BatchWriterConfig());
+ bw = client.createBatchWriter(table, new BatchWriterConfig());
}
}
diff --git a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/ops/TableOperations.java b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/ops/TableOperations.java
index 5dea9e3..2ba6d65 100644
--- a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/ops/TableOperations.java
+++ b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/ops/TableOperations.java
@@ -4,9 +4,9 @@
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
@@ -18,11 +18,9 @@ package org.apache.fluo.recipes.accumulo.ops;
import java.util.List;
import java.util.TreeSet;
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.ClientConfiguration;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.ZooKeeperInstance;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.fluo.api.client.FluoClient;
import org.apache.fluo.api.client.FluoFactory;
import org.apache.fluo.api.config.FluoConfiguration;
@@ -50,53 +48,51 @@ public class TableOperations {
private static final Logger logger = LoggerFactory.getLogger(TableOperations.class);
- private static Connector getConnector(FluoConfiguration fluoConfig) throws Exception {
-
- ZooKeeperInstance zki = new ZooKeeperInstance(
- new ClientConfiguration().withInstance(fluoConfig.getAccumuloInstance())
- .withZkHosts(fluoConfig.getAccumuloZookeepers()));
-
- Connector conn = zki.getConnector(fluoConfig.getAccumuloUser(),
- new PasswordToken(fluoConfig.getAccumuloPassword()));
- return conn;
+ private static AccumuloClient getClient(FluoConfiguration fluoConfig) throws Exception {
+ return Accumulo.newClient()
+ .to(fluoConfig.getAccumuloInstance(), fluoConfig.getAccumuloZookeepers())
+ .as(fluoConfig.getAccumuloUser(), fluoConfig.getAccumuloPassword()).build();
}
/**
* Make the requested table optimizations.
- *
+ *
* @param fluoConfig should contain information need to connect to Accumulo and name of Fluo table
* @param tableOptim Will perform these optimizations on Fluo table in Accumulo.
*/
public static void optimizeTable(FluoConfiguration fluoConfig, TableOptimizations tableOptim)
throws Exception {
- Connector conn = getConnector(fluoConfig);
- TreeSet<Text> splits = new TreeSet<>();
+ try (AccumuloClient client = getClient(fluoConfig)) {
- for (Bytes split : tableOptim.getSplits()) {
- splits.add(new Text(split.toArray()));
- }
+ TreeSet<Text> splits = new TreeSet<>();
- String table = fluoConfig.getAccumuloTable();
- conn.tableOperations().addSplits(table, splits);
-
- if (tableOptim.getTabletGroupingRegex() != null
- && !tableOptim.getTabletGroupingRegex().isEmpty()) {
- // was going to call :
- // conn.instanceOperations().testClassLoad(RGB_CLASS, TABLET_BALANCER_CLASS)
- // but that failed. See ACCUMULO-4068
-
- try {
- // setting this prop first intentionally because it should fail in 1.6
- conn.tableOperations().setProperty(table, RGB_PATTERN_PROP,
- tableOptim.getTabletGroupingRegex());
- conn.tableOperations().setProperty(table, RGB_DEFAULT_PROP, "none");
- conn.tableOperations().setProperty(table, TABLE_BALANCER_PROP, RGB_CLASS);
- } catch (AccumuloException e) {
- logger.warn("Unable to setup regex balancer (this is expected to fail in Accumulo 1.6.X) : "
- + e.getMessage());
- logger.debug("Unable to setup regex balancer (this is expected to fail in Accumulo 1.6.X)",
- e);
+ for (Bytes split : tableOptim.getSplits()) {
+ splits.add(new Text(split.toArray()));
+ }
+
+ String table = fluoConfig.getAccumuloTable();
+ client.tableOperations().addSplits(table, splits);
+
+ if (tableOptim.getTabletGroupingRegex() != null
+ && !tableOptim.getTabletGroupingRegex().isEmpty()) {
+ // was going to call :
+ // conn.instanceOperations().testClassLoad(RGB_CLASS, TABLET_BALANCER_CLASS)
+ // but that failed. See ACCUMULO-4068
+
+ try {
+ // setting this prop first intentionally because it should fail in 1.6
+ client.tableOperations().setProperty(table, RGB_PATTERN_PROP,
+ tableOptim.getTabletGroupingRegex());
+ client.tableOperations().setProperty(table, RGB_DEFAULT_PROP, "none");
+ client.tableOperations().setProperty(table, TABLE_BALANCER_PROP, RGB_CLASS);
+ } catch (AccumuloException e) {
+ logger
+ .warn("Unable to setup regex balancer (this is expected to fail in Accumulo 1.6.X) : "
+ + e.getMessage());
+ logger.debug(
+ "Unable to setup regex balancer (this is expected to fail in Accumulo 1.6.X)", e);
+ }
}
}
}
@@ -115,17 +111,16 @@ public class TableOperations {
* Compact all transient regions that were registered using {@link TransientRegistry}
*/
public static void compactTransient(FluoConfiguration fluoConfig) throws Exception {
- Connector conn = getConnector(fluoConfig);
-
- try (FluoClient client = FluoFactory.newClient(fluoConfig)) {
- SimpleConfiguration appConfig = client.getAppConfiguration();
+ try (AccumuloClient aclient = getClient(fluoConfig);
+ FluoClient fclient = FluoFactory.newClient(fluoConfig)) {
+ SimpleConfiguration appConfig = fclient.getAppConfiguration();
TransientRegistry transientRegistry = new TransientRegistry(appConfig);
List<RowRange> ranges = transientRegistry.getTransientRanges();
for (RowRange r : ranges) {
long t1 = System.currentTimeMillis();
- conn.tableOperations().compact(fluoConfig.getAccumuloTable(),
+ aclient.tableOperations().compact(fluoConfig.getAccumuloTable(),
new Text(r.getStart().toArray()), new Text(r.getEnd().toArray()), true, true);
long t2 = System.currentTimeMillis();
logger.info("Compacted {} in {}ms", r, (t2 - t1));
@@ -135,8 +130,9 @@ public class TableOperations {
public static void compactTransient(FluoConfiguration fluoConfig, RowRange tRange)
throws Exception {
- Connector conn = getConnector(fluoConfig);
- conn.tableOperations().compact(fluoConfig.getAccumuloTable(),
- new Text(tRange.getStart().toArray()), new Text(tRange.getEnd().toArray()), true, true);
+ try (AccumuloClient client = getClient(fluoConfig)) {
+ client.tableOperations().compact(fluoConfig.getAccumuloTable(),
+ new Text(tRange.getStart().toArray()), new Text(tRange.getEnd().toArray()), true, true);
+ }
}
}
diff --git a/modules/accumulo/src/main/spotbugs/exclude-filter.xml b/modules/accumulo/src/main/spotbugs/exclude-filter.xml
new file mode 100644
index 0000000..9cfeb5c
--- /dev/null
+++ b/modules/accumulo/src/main/spotbugs/exclude-filter.xml
@@ -0,0 +1,23 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<FindBugsFilter>
+ <Match>
+ <!-- Must ignore these everywhere, because of a javac byte code generation bug -->
+ <!-- https://github.com/spotbugs/spotbugs/issues/756 -->
+ <Bug pattern="RCN_REDUNDANT_NULLCHECK_WOULD_HAVE_BEEN_A_NPE" />
+ </Match>
+</FindBugsFilter>
diff --git a/modules/core/src/main/spotbugs/exclude-filter.xml b/modules/core/src/main/spotbugs/exclude-filter.xml
new file mode 100644
index 0000000..9cfeb5c
--- /dev/null
+++ b/modules/core/src/main/spotbugs/exclude-filter.xml
@@ -0,0 +1,23 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<FindBugsFilter>
+ <Match>
+ <!-- Must ignore these everywhere, because of a javac byte code generation bug -->
+ <!-- https://github.com/spotbugs/spotbugs/issues/756 -->
+ <Bug pattern="RCN_REDUNDANT_NULLCHECK_WOULD_HAVE_BEEN_A_NPE" />
+ </Match>
+</FindBugsFilter>
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/ExportTestBase.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/ExportTestBase.java
index 44480ee..f31a6e5 100644
--- a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/ExportTestBase.java
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/ExportTestBase.java
@@ -4,9 +4,9 @@
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
diff --git a/modules/spark/pom.xml b/modules/spark/pom.xml
index 556f591..c316d53 100644
--- a/modules/spark/pom.xml
+++ b/modules/spark/pom.xml
@@ -25,10 +25,6 @@
<name>Apache Fluo Recipes for Apache Spark</name>
<dependencies>
<dependency>
- <groupId>commons-configuration</groupId>
- <artifactId>commons-configuration</artifactId>
- </dependency>
- <dependency>
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-core</artifactId>
</dependency>
@@ -42,18 +38,11 @@
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <exclusions>
- <exclusion>
- <!-- Causes conflicts with servlet-api in Spark -->
- <groupId>javax.servlet</groupId>
- <artifactId>servlet-api</artifactId>
- </exclusion>
- </exclusions>
+ <artifactId>hadoop-client-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.10</artifactId>
+ <artifactId>spark-core_2.12</artifactId>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
diff --git a/modules/spark/src/main/java/org/apache/fluo/recipes/spark/FluoSparkHelper.java b/modules/spark/src/main/java/org/apache/fluo/recipes/spark/FluoSparkHelper.java
index 4467868..08ddae0 100644
--- a/modules/spark/src/main/java/org/apache/fluo/recipes/spark/FluoSparkHelper.java
+++ b/modules/spark/src/main/java/org/apache/fluo/recipes/spark/FluoSparkHelper.java
@@ -4,9 +4,9 @@
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
@@ -21,15 +21,13 @@ import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.ClientConfiguration;
import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.client.mapreduce.AccumuloFileOutputFormat;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.fluo.api.config.FluoConfiguration;
@@ -65,7 +63,6 @@ public class FluoSparkHelper {
private Configuration hadoopConfig;
private Path tempBaseDir;
private FileSystem hdfs;
- private Connector defaultConn;
// @formatter:off
public FluoSparkHelper(FluoConfiguration fluoConfig, Configuration hadoopConfig,
@@ -74,7 +71,7 @@ public class FluoSparkHelper {
this.fluoConfig = fluoConfig;
this.hadoopConfig = hadoopConfig;
this.tempBaseDir = tempBaseDir;
- defaultConn = getAccumuloConnector(fluoConfig);
+ this.fluoConfig = fluoConfig;
try {
hdfs = FileSystem.get(hadoopConfig);
} catch (IOException e) {
@@ -102,20 +99,9 @@ public class FluoSparkHelper {
return pairRDD.map(t -> new RowColumnValue(t._1().getRow(), t._1().getColumn(), t._2()));
}
- private static Instance getInstance(FluoConfiguration config) {
- ClientConfiguration clientConfig = new ClientConfiguration()
- .withInstance(config.getAccumuloInstance()).withZkHosts(config.getAccumuloZookeepers())
- .withZkTimeout(config.getZookeeperTimeout() / 1000);
- return new ZooKeeperInstance(clientConfig);
- }
-
- private static Connector getAccumuloConnector(FluoConfiguration config) {
- try {
- return getInstance(config).getConnector(config.getAccumuloUser(),
- new PasswordToken(config.getAccumuloPassword()));
- } catch (AccumuloException | AccumuloSecurityException e) {
- throw new IllegalStateException(e);
- }
+ private static AccumuloClient getAccumuloClient(FluoConfiguration config) {
+ return Accumulo.newClient().to(config.getAccumuloInstance(), config.getAccumuloZookeepers())
+ .as(config.getAccumuloUser(), config.getAccumuloPassword()).build();
}
/**
@@ -158,7 +144,7 @@ public class FluoSparkHelper {
for (FluoKeyValue kv : fkvg.getKeyValues()) {
output.add(new Tuple2<>(kv.getKey(), kv.getValue()));
}
- return output;
+ return output.iterator();
});
bulkImportKvToAccumulo(kvData, fluoConfig.getAccumuloTable(), opts);
@@ -214,9 +200,9 @@ public class FluoSparkHelper {
BulkImportOptions opts) {
Path tempDir = getTempDir(opts);
- Connector conn = chooseConnector(opts);
- try {
+
+ try (AccumuloClient client = getAccumuloClient(fluoConfig)) {
if (hdfs.exists(tempDir)) {
throw new IllegalArgumentException("HDFS temp dir already exists: " + tempDir.toString());
}
@@ -234,6 +220,7 @@ public class FluoSparkHelper {
// bulk import data to Accumulo
log.info("Wrote data for bulk import to HDFS temp directory: {}", dataDir);
+ Connector conn = chooseConnector(client, opts);
conn.tableOperations().importDirectory(accumuloTable, dataDir.toString(), failDir.toString(),
false);
@@ -274,10 +261,12 @@ public class FluoSparkHelper {
* If this methods is not called, then a Connector will be created using properties in the
* FluoConfiguration supplied to
* {@link FluoSparkHelper#FluoSparkHelper(FluoConfiguration, Configuration, Path)}
- *
+ *
* @param conn Use this connector to bulk import files into Accumulo.
* @return this
+ * @deprecated use {@link #setAccumuloClient(AccumuloClient)}
*/
+ @Deprecated(since = "1.3.0", forRemoval = true)
public BulkImportOptions setAccumuloConnector(Connector conn) {
Objects.requireNonNull(conn);
this.conn = conn;
@@ -285,9 +274,29 @@ public class FluoSparkHelper {
}
/**
+ * If this methods is not called, then a Client will be created using properties in the
+ * FluoConfiguration supplied to
+ * {@link FluoSparkHelper#FluoSparkHelper(FluoConfiguration, Configuration, Path)}
+ *
+ * @param conn Use this connector to bulk import files into Accumulo.
+ * @return this
+ *
+ * @since 1.3.0
+ */
+ public BulkImportOptions setAccumuloClient(AccumuloClient client) {
+ Objects.requireNonNull(client);
+ try {
+ this.conn = Connector.from(client);
+ } catch (AccumuloSecurityException | AccumuloException e) {
+ throw new RuntimeException(e);
+ }
+ return this;
+ }
+
+ /**
* If this method is not called, then a temp dir will be created based on the path passed
* supplied to {@link FluoSparkHelper#FluoSparkHelper(FluoConfiguration, Configuration, Path)}
- *
+ *
* @param tempDir Use this directory to store RFiles generated for bulk import.
* @return this
*/
@@ -323,16 +332,20 @@ public class FluoSparkHelper {
String accumuloTable, BulkImportOptions opts) {
// partition and sort data so that one file is created per an accumulo tablet
Partitioner accumuloPartitioner;
- try {
+ try (AccumuloClient client = getAccumuloClient(fluoConfig)) {
accumuloPartitioner = new AccumuloRangePartitioner(
- chooseConnector(opts).tableOperations().listSplits(accumuloTable));
+ chooseConnector(client, opts).tableOperations().listSplits(accumuloTable));
} catch (TableNotFoundException | AccumuloSecurityException | AccumuloException e) {
throw new IllegalStateException(e);
}
return data.repartitionAndSortWithinPartitions(accumuloPartitioner);
}
- private Connector chooseConnector(BulkImportOptions opts) {
- return opts.conn == null ? defaultConn : opts.conn;
+ private Connector chooseConnector(AccumuloClient client, BulkImportOptions opts) {
+ try {
+ return opts.conn == null ? Connector.from(client) : opts.conn;
+ } catch (AccumuloSecurityException | AccumuloException e) {
+ throw new RuntimeException(e);
+ }
}
}
diff --git a/modules/spark/src/main/spotbugs/exclude-filter.xml b/modules/spark/src/main/spotbugs/exclude-filter.xml
new file mode 100644
index 0000000..9cfeb5c
--- /dev/null
+++ b/modules/spark/src/main/spotbugs/exclude-filter.xml
@@ -0,0 +1,23 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<FindBugsFilter>
+ <Match>
+ <!-- Must ignore these everywhere, because of a javac byte code generation bug -->
+ <!-- https://github.com/spotbugs/spotbugs/issues/756 -->
+ <Bug pattern="RCN_REDUNDANT_NULLCHECK_WOULD_HAVE_BEEN_A_NPE" />
+ </Match>
+</FindBugsFilter>
diff --git a/modules/test/src/main/java/org/apache/fluo/recipes/test/FluoITHelper.java b/modules/test/src/main/java/org/apache/fluo/recipes/test/FluoITHelper.java
index 3d3ae88..80a740f 100644
--- a/modules/test/src/main/java/org/apache/fluo/recipes/test/FluoITHelper.java
+++ b/modules/test/src/main/java/org/apache/fluo/recipes/test/FluoITHelper.java
@@ -4,9 +4,9 @@
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
@@ -24,6 +24,9 @@ import java.util.Map;
import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
@@ -145,7 +148,9 @@ public class FluoITHelper {
*
* @param conn Accumulo connector of to instance with table to print
* @param accumuloTable Accumulo table to print
+ * @deprecated since 1.3.0 use {@link #printAccumuloTable(AccumuloClient, String)}
*/
+ @Deprecated(since = "1.3.0", forRemoval = true)
public static void printAccumuloTable(Connector conn, String accumuloTable) {
Scanner scanner = null;
try {
@@ -163,6 +168,23 @@ public class FluoITHelper {
System.out.println("== accumulo end ==");
}
+ /**
+ * Prints specified Accumulo table
+ *
+ * @param client Accumulo clientto instance with table to print
+ * @param accumuloTable Accumulo table to print
+ *
+ * @since 1.3.0
+ */
+ @SuppressWarnings("deprecation")
+ public static void printAccumuloTable(AccumuloClient client, String accumuloTable) {
+ try {
+ printAccumuloTable(Connector.from(client), accumuloTable);
+ } catch (AccumuloSecurityException | AccumuloException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
private static boolean diff(String dataType, String expected, String actual) {
if (!expected.equals(actual)) {
log.error("Difference found in {} - expected {} actual {}", dataType, expected, actual);
@@ -187,7 +209,9 @@ public class FluoITHelper {
* @param accumuloTable Accumulo table with actual data
* @param expected RowColumnValue list containing expected data
* @return True if actual data matches expected data
+ * @deprecated since 1.3.0 use {@link #verifyAccumuloTable(AccumuloClient, String, Collection)}
*/
+ @Deprecated(since = "1.3.0", forRemoval = true)
public static boolean verifyAccumuloTable(Connector conn, String accumuloTable,
Collection<RowColumnValue> expected) {
@@ -232,6 +256,26 @@ public class FluoITHelper {
}
/**
+ * Verifies that actual data in Accumulo table matches expected data
+ *
+ * @param client Client from Accumulo instance with actual data
+ * @param accumuloTable Accumulo table with actual data
+ * @param expected RowColumnValue list containing expected data
+ * @return True if actual data matches expected data
+ *
+ * @since 1.3.0
+ */
+ @SuppressWarnings("deprecation")
+ public static boolean verifyAccumuloTable(AccumuloClient client, String accumuloTable,
+ Collection<RowColumnValue> expected) {
+ try {
+ return verifyAccumuloTable(Connector.from(client), accumuloTable, expected);
+ } catch (AccumuloSecurityException | AccumuloException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
* Verifies that expected list of RowColumnValues matches actual
*
* @param expected RowColumnValue list containing expected data
diff --git a/modules/test/src/main/spotbugs/exclude-filter.xml b/modules/test/src/main/spotbugs/exclude-filter.xml
new file mode 100644
index 0000000..9cfeb5c
--- /dev/null
+++ b/modules/test/src/main/spotbugs/exclude-filter.xml
@@ -0,0 +1,23 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<FindBugsFilter>
+ <Match>
+ <!-- Must ignore these everywhere, because of a javac byte code generation bug -->
+ <!-- https://github.com/spotbugs/spotbugs/issues/756 -->
+ <Bug pattern="RCN_REDUNDANT_NULLCHECK_WOULD_HAVE_BEEN_A_NPE" />
+ </Match>
+</FindBugsFilter>
diff --git a/pom.xml b/pom.xml
index 6aabd31..0c604f1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -50,15 +50,16 @@
<url>https://github.com/apache/fluo-recipes/issues</url>
</issueManagement>
<properties>
- <accumulo.version>1.7.3</accumulo.version>
- <curator.version>2.7.1</curator.version>
- <findbugs.maxRank>13</findbugs.maxRank>
- <fluo.version>1.2.0</fluo.version>
- <hadoop.version>2.6.3</hadoop.version>
+ <accumulo.version>2.0.0</accumulo.version>
+ <curator.version>4.0.1</curator.version>
+ <!-- Prevent findbugs from runnning because it does not work with Java 11 and is configured to run by parent pom. Spotbugs is configured in place of findbugs. -->
+ <findbugs.skip>true</findbugs.skip>
+ <fluo.version>2.0.0-SNAPSHOT</fluo.version>
+ <hadoop.version>3.1.0</hadoop.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<releaseProfiles>fluo-recipes-release</releaseProfiles>
- <spark.version>1.5.2</spark.version>
+ <spark.version>2.4.6</spark.version>
<zookeeper.version>3.4.8</zookeeper.version>
</properties>
<dependencyManagement>
@@ -69,6 +70,11 @@
<version>3.0.3</version>
</dependency>
<dependency>
+ <groupId>com.github.spotbugs</groupId>
+ <artifactId>spotbugs-annotations</artifactId>
+ <version>4.0.2</version>
+ </dependency>
+ <dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.2.4</version>
@@ -76,7 +82,7 @@
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
- <version>13.0.1</version>
+ <version>27.0-jre</version>
</dependency>
<dependency>
<groupId>commons-collections</groupId>
@@ -84,11 +90,6 @@
<version>3.2.1</version>
</dependency>
<dependency>
- <groupId>commons-configuration</groupId>
- <artifactId>commons-configuration</artifactId>
- <version>1.10</version>
- </dependency>
- <dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.4</version>
@@ -185,12 +186,17 @@
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
+ <artifactId>hadoop-client-api</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client-runtime</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.10</artifactId>
+ <artifactId>spark-core_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
@@ -206,7 +212,7 @@
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
- <version>2.10.4</version>
+ <version>2.12.11</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
@@ -219,6 +225,26 @@
<pluginManagement>
<plugins>
<plugin>
+ <groupId>com.github.spotbugs</groupId>
+ <artifactId>spotbugs-maven-plugin</artifactId>
+ <version>4.0.0</version>
+ <configuration>
+ <xmlOutput>true</xmlOutput>
+ <effort>Max</effort>
+ <failOnError>true</failOnError>
+ <includeTests>true</includeTests>
+ <maxRank>13</maxRank>
+ <jvmArgs>-Dcom.overstock.findbugs.ignore=com.google.common.util.concurrent.RateLimiter,com.google.common.hash.Hasher,com.google.common.hash.HashCode,com.google.common.hash.HashFunction,com.google.common.hash.Hashing,com.google.common.cache.Cache,com.google.common.io.CountingOutputStream,com.google.common.io.ByteStreams,com.google.common.cache.LoadingCache,com.google.common.base.Stopwatch,com.google.common.cache.RemovalNotification,com.google.common.util.concurrent.Uninterrupt [...]
+ <plugins combine.children="append">
+ <plugin>
+ <groupId>com.overstock.findbugs</groupId>
+ <artifactId>library-detectors</artifactId>
+ <version>1.2.0</version>
+ </plugin>
+ </plugins>
+ </configuration>
+ </plugin>
+ <plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<configuration>
@@ -232,6 +258,18 @@
</pluginManagement>
<plugins>
<plugin>
+ <groupId>com.github.spotbugs</groupId>
+ <artifactId>spotbugs-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>run-spotbugs</id>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
@@ -247,9 +285,7 @@
<ignoredDependency>org.apache.fluo:fluo-core:jar:${fluo.version}</ignoredDependency>
<ignoredDependency>org.apache.fluo:fluo-mini:jar:${fluo.version}</ignoredDependency>
<ignoredDependency>org.apache.fluo:fluo-recipes-kryo:jar:${project.version}</ignoredDependency>
- <ignoredDependency>org.apache.hadoop:hadoop-client:jar:${hadoop.version}</ignoredDependency>
- <ignoredDependency>org.apache.hadoop:hadoop-common:jar:${hadoop.version}</ignoredDependency>
- <ignoredDependency>org.apache.hadoop:hadoop-mapreduce-client-core:jar:${hadoop.version}</ignoredDependency>
+ <ignoredDependency>org.apache.hadoop:hadoop-client-api:jar:${hadoop.version}</ignoredDependency>
</ignoredDependencies>
</configuration>
</execution>
@@ -259,14 +295,25 @@
</build>
<profiles>
<profile>
+ <id>add-spotbugs-excludes</id>
+ <activation>
+ <file>
+ <exists>src/main/spotbugs/exclude-filter.xml</exists>
+ </file>
+ </activation>
+ <properties>
+ <spotbugs.excludeFilterFile>src/main/spotbugs/exclude-filter.xml</spotbugs.excludeFilterFile>
+ </properties>
+ </profile>
+ <profile>
<id>fluo-recipes-release</id>
<!-- some properties to make the release build a bit faster -->
<properties>
<checkstyle.skip>true</checkstyle.skip>
- <findbugs.skip>true</findbugs.skip>
<modernizer.skip>true</modernizer.skip>
<skipITs>true</skipITs>
<skipTests>true</skipTests>
+ <spotbugs.skip>true</spotbugs.skip>
</properties>
</profile>
</profiles>