You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ar...@apache.org on 2019/05/20 08:53:09 UTC
[beam] branch master updated: [BEAM-7359] Fix static analysis
issues for HadoopFormatIO (#8616)
This is an automated email from the ASF dual-hosted git repository.
aromanenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new a06b7a6 [BEAM-7359] Fix static analysis issues for HadoopFormatIO (#8616)
a06b7a6 is described below
commit a06b7a6cbad2386b7bee37d3be0d7f3f5b8bfaff
Author: Ismaël Mejía <ie...@gmail.com>
AuthorDate: Mon May 20 10:52:52 2019 +0200
[BEAM-7359] Fix static analysis issues for HadoopFormatIO (#8616)
---
.../sdk/io/hadoop/format/HDFSSynchronization.java | 2 +-
.../beam/sdk/io/hadoop/format/HadoopFormatIO.java | 122 +++++++++------------
.../beam/sdk/io/hadoop/format/HadoopFormats.java | 4 +-
.../format/ConfigurableEmployeeInputFormat.java | 2 +-
.../apache/beam/sdk/io/hadoop/format/Employee.java | 10 +-
.../io/hadoop/format/HDFSSynchronizationTest.java | 7 +-
.../hadoop/format/HadoopFormatIOCassandraTest.java | 2 -
.../hadoop/format/HadoopFormatIOElasticTest.java | 6 +-
.../sdk/io/hadoop/format/HadoopFormatIOIT.java | 2 +-
.../io/hadoop/format/HadoopFormatIOReadTest.java | 4 -
.../format/HadoopFormatIOSequenceFileTest.java | 18 +--
.../format/ReuseObjectsEmployeeInputFormat.java | 2 +-
.../sdk/io/hadoop/format/TestEmployeeDataSet.java | 2 +-
.../sdk/io/hadoop/format/TestRowDBWritable.java | 5 +-
14 files changed, 72 insertions(+), 116 deletions(-)
diff --git a/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HDFSSynchronization.java b/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HDFSSynchronization.java
index 5a73ea1..60f60b2 100644
--- a/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HDFSSynchronization.java
+++ b/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HDFSSynchronization.java
@@ -180,7 +180,7 @@ public class HDFSSynchronization implements ExternalSynchronization {
* @param <X> exception type
*/
@FunctionalInterface
- public interface ThrowingFunction<T1, T2, X extends Exception> extends Serializable {
+ interface ThrowingFunction<T1, T2, X extends Exception> extends Serializable {
T2 apply(T1 value) throws X;
}
}
diff --git a/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIO.java b/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIO.java
index 8819024..b39ec80 100644
--- a/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIO.java
+++ b/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIO.java
@@ -226,7 +226,7 @@ import org.slf4j.LoggerFactory;
* <li>{@code mapreduce.job.output.value.class}: The value class passed to the {@link
* OutputFormat} in {@code mapreduce.job.outputformat.class}.
* <li>{@code mapreduce.job.reduces}: Number of reduce tasks. Value is equal to number of write
- * tasks which will be genarated. This property is not required for {@link
+ * tasks which will be generated. This property is not required for {@link
* Write.PartitionedWriterBuilder#withoutPartitioning()} write.
* <li>{@code mapreduce.job.partitioner.class}: Hadoop partitioner class which will be used for
* distributing of records among partitions. This property is not required for {@link
@@ -296,7 +296,7 @@ import org.slf4j.LoggerFactory;
*/
@Experimental(Experimental.Kind.SOURCE_SINK)
public class HadoopFormatIO {
- private static final Logger LOGGER = LoggerFactory.getLogger(HadoopFormatIO.class);
+ private static final Logger LOG = LoggerFactory.getLogger(HadoopFormatIO.class);
/** {@link MRJobConfig#OUTPUT_FORMAT_CLASS_ATTR}. */
public static final String OUTPUT_FORMAT_CLASS_ATTR = MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR;
@@ -439,7 +439,7 @@ public class HadoopFormatIO {
// Sets key class to key translation function's output class type.
return toBuilder()
.setKeyTranslationFunction(function)
- .setKeyTypeDescriptor((TypeDescriptor<K>) function.getOutputTypeDescriptor())
+ .setKeyTypeDescriptor(function.getOutputTypeDescriptor())
.build();
}
@@ -449,7 +449,7 @@ public class HadoopFormatIO {
// Sets value class to value translation function's output class type.
return toBuilder()
.setValueTranslationFunction(function)
- .setValueTypeDescriptor((TypeDescriptor<V>) function.getOutputTypeDescriptor())
+ .setValueTypeDescriptor(function.getOutputTypeDescriptor())
.build();
}
@@ -633,25 +633,24 @@ public class HadoopFormatIO {
// desiredBundleSizeBytes is not being considered as splitting based on this
// value is not supported by inputFormat getSplits() method.
if (inputSplit != null) {
- LOGGER.info("Not splitting source {} because source is already split.", this);
- return ImmutableList.of((BoundedSource<KV<K, V>>) this);
+ LOG.info("Not splitting source {} because source is already split.", this);
+ return ImmutableList.of(this);
}
computeSplitsIfNecessary();
- LOGGER.info(
+ LOG.info(
"Generated {} splits. Size of first split is {} ",
inputSplits.size(),
inputSplits.get(0).getSplit().getLength());
return inputSplits.stream()
.map(
- serializableInputSplit -> {
- return new HadoopInputFormatBoundedSource<>(
- conf,
- keyCoder,
- valueCoder,
- keyTranslationFunction,
- valueTranslationFunction,
- serializableInputSplit);
- })
+ serializableInputSplit ->
+ new HadoopInputFormatBoundedSource<>(
+ conf,
+ keyCoder,
+ valueCoder,
+ keyTranslationFunction,
+ valueTranslationFunction,
+ serializableInputSplit))
.collect(Collectors.toList());
}
@@ -808,9 +807,7 @@ public class HadoopFormatIO {
public boolean start() throws IOException {
try {
recordsReturned.set(0L);
- recordReader =
- (RecordReader<T1, T2>)
- inputFormatObj.createRecordReader(split.getSplit(), taskAttemptContext);
+ recordReader = inputFormatObj.createRecordReader(split.getSplit(), taskAttemptContext);
if (recordReader != null) {
recordReader.initialize(split.getSplit(), taskAttemptContext);
progressValue.set(getProgress());
@@ -852,30 +849,23 @@ public class HadoopFormatIO {
@Override
public KV<K, V> getCurrent() {
- K key = null;
- V value = null;
+ K key;
+ V value;
try {
// Transform key if translation function is provided.
- key =
- transformKeyOrValue(
- (T1) recordReader.getCurrentKey(), keyTranslationFunction, keyCoder);
+ key = transformKeyOrValue(recordReader.getCurrentKey(), keyTranslationFunction, keyCoder);
// Transform value if translation function is provided.
value =
transformKeyOrValue(
- (T2) recordReader.getCurrentValue(), valueTranslationFunction, valueCoder);
+ recordReader.getCurrentValue(), valueTranslationFunction, valueCoder);
} catch (IOException | InterruptedException e) {
- LOGGER.error("Unable to read data: " + "{}", e);
+ LOG.error("Unable to read data: ", e);
throw new IllegalStateException("Unable to read data: " + "{}", e);
}
return KV.of(key, value);
}
- /**
- * Returns the serialized output of transformed key or value object.
- *
- * @throws ClassCastException
- * @throws CoderException
- */
+ /** Returns the serialized output of transformed key or value object. */
@SuppressWarnings("unchecked")
private <T, T3> T3 transformKeyOrValue(
T input, @Nullable SimpleFunction<T, T3> simpleFunction, Coder<T3> coder)
@@ -886,7 +876,7 @@ public class HadoopFormatIO {
} else {
output = (T3) input;
}
- return cloneIfPossiblyMutable((T3) output, coder);
+ return cloneIfPossiblyMutable(output, coder);
}
/**
@@ -909,7 +899,7 @@ public class HadoopFormatIO {
@Override
public void close() throws IOException {
- LOGGER.info("Closing reader after reading {} records.", recordsReturned);
+ LOG.info("Closing reader after reading {} records.", recordsReturned);
if (recordReader != null) {
recordReader.close();
recordReader = null;
@@ -929,21 +919,15 @@ public class HadoopFormatIO {
return progressValue.doubleValue();
}
- /**
- * Returns RecordReader's progress.
- *
- * @throws IOException
- * @throws InterruptedException
- */
+ /** Returns RecordReader's progress. */
private Double getProgress() throws IOException, InterruptedException {
try {
float progress = recordReader.getProgress();
return (double) progress < 0 || progress > 1 ? 0.0 : progress;
} catch (IOException e) {
- LOGGER.error(
+ LOG.error(
"Error in computing the fractions consumed as RecordReader.getProgress() throws an "
- + "exception : "
- + "{}",
+ + "exception : ",
e);
throw new IOException(
"Error in computing the fractions consumed as RecordReader.getProgress() throws an "
@@ -1012,7 +996,7 @@ public class HadoopFormatIO {
extends PTransform<
PCollection<KV<KeyT, ValueT>>, PCollection<KV<Integer, KV<KeyT, ValueT>>>> {
- private PCollectionView<Configuration> configView;
+ private final PCollectionView<Configuration> configView;
private GroupDataByPartition(PCollectionView<Configuration> configView) {
this.configView = configView;
@@ -1063,17 +1047,18 @@ public class HadoopFormatIO {
*/
public static class Write<KeyT, ValueT> extends PTransform<PCollection<KV<KeyT, ValueT>>, PDone> {
- @Nullable private transient Configuration configuration;
+ @Nullable private final transient Configuration configuration;
@Nullable
- private PTransform<PCollection<? extends KV<KeyT, ValueT>>, PCollectionView<Configuration>>
+ private final PTransform<
+ PCollection<? extends KV<KeyT, ValueT>>, PCollectionView<Configuration>>
configTransform;
- private ExternalSynchronization externalSynchronization;
+ private final ExternalSynchronization externalSynchronization;
- private boolean withPartitioning;
+ private final boolean withPartitioning;
- public Write(
+ Write(
@Nullable Configuration configuration,
@Nullable
PTransform<PCollection<? extends KV<KeyT, ValueT>>, PCollectionView<Configuration>>
@@ -1101,7 +1086,7 @@ public class HadoopFormatIO {
*
* <ul>
* <li>{@code mapreduce.job.reduces}: Number of reduce tasks. Value is equal to number of
- * write tasks which will be genarated.
+ * write tasks which will be generated.
* <li>{@code mapreduce.job.partitioner.class}: Hadoop partitioner class which will be used
* for distributing of records among partitions.
* </ul>
@@ -1200,7 +1185,6 @@ public class HadoopFormatIO {
private Configuration configuration;
private PTransform<PCollection<? extends KV<KeyT, ValueT>>, PCollectionView<Configuration>>
configTransform;
- private ExternalSynchronization externalSynchronization;
private boolean isWithPartitioning;
@Override
@@ -1236,11 +1220,10 @@ public class HadoopFormatIO {
public Write<KeyT, ValueT> withExternalSynchronization(
ExternalSynchronization externalSynchronization) {
checkNotNull(externalSynchronization, "External synchronization cannot be null");
- this.externalSynchronization = externalSynchronization;
return new Write<>(
this.configuration,
this.configTransform,
- this.externalSynchronization,
+ externalSynchronization,
this.isWithPartitioning);
}
}
@@ -1415,14 +1398,13 @@ public class HadoopFormatIO {
*/
private static class TaskContext<KeyT, ValueT> {
- private RecordWriter<KeyT, ValueT> recordWriter;
- private OutputCommitter outputCommitter;
- private OutputFormat<KeyT, ValueT> outputFormatObj;
- private TaskAttemptContext taskAttemptContext;
+ private final RecordWriter<KeyT, ValueT> recordWriter;
+ private final OutputCommitter outputCommitter;
+ private final TaskAttemptContext taskAttemptContext;
TaskContext(TaskAttemptID taskAttempt, Configuration conf) {
taskAttemptContext = HadoopFormats.createTaskAttemptContext(conf, taskAttempt);
- outputFormatObj = HadoopFormats.createOutputFormatFromConfig(conf);
+ OutputFormat<KeyT, ValueT> outputFormatObj = HadoopFormats.createOutputFormatFromConfig(conf);
outputCommitter = initOutputCommitter(outputFormatObj, conf, taskAttemptContext);
recordWriter = initRecordWriter(outputFormatObj, taskAttemptContext);
}
@@ -1460,7 +1442,7 @@ public class HadoopFormatIO {
OutputFormat<KeyT, ValueT> outputFormatObj, TaskAttemptContext taskAttemptContext)
throws IllegalStateException {
try {
- LOGGER.info(
+ LOG.info(
"Creating new RecordWriter for task {} of Job with id {}.",
taskAttemptContext.getTaskAttemptID().getTaskID().getId(),
taskAttemptContext.getJobID().getJtIdentifier());
@@ -1535,9 +1517,9 @@ public class HadoopFormatIO {
*/
private static class SetupJobFn<KeyT, ValueT> extends DoFn<KV<KeyT, ValueT>, KV<KeyT, ValueT>> {
- private ExternalSynchronization externalSynchronization;
- private PCollectionView<Configuration> configView;
- private TypeDescriptor<KV<KeyT, ValueT>> inputTypeDescriptor;
+ private final ExternalSynchronization externalSynchronization;
+ private final PCollectionView<Configuration> configView;
+ private final TypeDescriptor<KV<KeyT, ValueT>> inputTypeDescriptor;
private boolean isSetupJobAttempted;
SetupJobFn(
@@ -1666,13 +1648,13 @@ public class HadoopFormatIO {
jobOutputFormat.checkOutputSpecs(setupTaskContext);
jobOutputFormat.getOutputCommitter(setupTaskContext).setupJob(setupTaskContext);
- LOGGER.info(
+ LOG.info(
"Job with id {} successfully configured from window with max timestamp {}.",
jobId.getJtIdentifier(),
window.maxTimestamp());
} catch (FileAlreadyExistsException e) {
- LOGGER.info("Job was already set by other worker. Skipping rest of the setup.");
+ LOG.info("Job was already set by other worker. Skipping rest of the setup.");
} catch (Exception e) {
throw new RuntimeException("Unable to setup job.", e);
}
@@ -1686,8 +1668,8 @@ public class HadoopFormatIO {
*/
private static class CommitJobFn<T> extends DoFn<Iterable<T>, Void> {
- private PCollectionView<Configuration> configView;
- private ExternalSynchronization externalSynchronization;
+ private final PCollectionView<Configuration> configView;
+ private final ExternalSynchronization externalSynchronization;
CommitJobFn(
PCollectionView<Configuration> configView,
@@ -1734,7 +1716,7 @@ public class HadoopFormatIO {
private static class AssignTaskFn<KeyT, ValueT>
extends DoFn<KV<KeyT, ValueT>, KV<Integer, KV<KeyT, ValueT>>> {
- private PCollectionView<Configuration> configView;
+ private final PCollectionView<Configuration> configView;
// Transient properties because they are used only for one bundle
/** Cache of created TaskIDs for given bundle. */
@@ -1879,7 +1861,7 @@ public class HadoopFormatIO {
taskContext.getRecordWriter().close(taskContext.getTaskAttemptContext());
taskContext.getOutputCommitter().commitTask(taskContext.getTaskAttemptContext());
- LOGGER.info("Write task for {} was successfully committed!", taskContext);
+ LOG.info("Write task for {} was successfully committed!", taskContext);
} catch (Exception e) {
processTaskException(taskContext, e);
}
@@ -1890,7 +1872,7 @@ public class HadoopFormatIO {
}
private void processTaskException(TaskContext<KeyT, ValueT> taskContext, Exception e) {
- LOGGER.warn("Write task for {} failed. Will abort task.", taskContext);
+ LOG.warn("Write task for {} failed. Will abort task.", taskContext);
taskContext.abortTask();
throw new IllegalArgumentException(e);
}
@@ -1933,7 +1915,7 @@ public class HadoopFormatIO {
processTaskException(taskContext, e);
}
- LOGGER.info(
+ LOG.info(
"Task with id {} of job {} was successfully setup!",
taskId,
HadoopFormats.getJobId(conf).getJtIdentifier());
diff --git a/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormats.java b/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormats.java
index 61d9d5b..4386fb4 100644
--- a/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormats.java
+++ b/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormats.java
@@ -38,7 +38,7 @@ final class HadoopFormats {
private static final int DEFAULT_JOB_NUMBER = 0;
static final Class<HashPartitioner> DEFAULT_PARTITIONER_CLASS_ATTR = HashPartitioner.class;
- static final int DEFAULT_NUM_REDUCERS = 1;
+ private static final int DEFAULT_NUM_REDUCERS = 1;
private HadoopFormats() {}
@@ -92,7 +92,7 @@ final class HadoopFormats {
/**
* Creates {@link TaskAttemptContext}.
*
- * @param conf cofniguration
+ * @param conf configuration
* @param taskAttemptID taskAttemptId
* @return new {@link TaskAttemptContext}
*/
diff --git a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/ConfigurableEmployeeInputFormat.java b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/ConfigurableEmployeeInputFormat.java
index 0846a96..5e5640b 100644
--- a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/ConfigurableEmployeeInputFormat.java
+++ b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/ConfigurableEmployeeInputFormat.java
@@ -61,7 +61,7 @@ class ConfigurableEmployeeInputFormat extends InputFormat<Text, Employee> implem
/**
* Returns InputSPlit list of {@link ConfigurableEmployeeInputSplit}. Throws exception if {@link
- * #setConf()} is not called.
+ * #setConf(Configuration)} is not called.
*/
@Override
public List<InputSplit> getSplits(JobContext context) throws IOException {
diff --git a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/Employee.java b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/Employee.java
index 49b5938..54cd47b 100644
--- a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/Employee.java
+++ b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/Employee.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.io.hadoop.format;
+import java.util.Objects;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
@@ -64,15 +65,10 @@ public class Employee {
Employee employeePojo = (Employee) o;
- if (empName != null ? !empName.equals(employeePojo.empName) : employeePojo.empName != null) {
+ if (!Objects.equals(empName, employeePojo.empName)) {
return false;
}
- if (empAddress != null
- ? !empAddress.equals(employeePojo.empAddress)
- : employeePojo.empAddress != null) {
- return false;
- }
- return true;
+ return Objects.equals(empAddress, employeePojo.empAddress);
}
@Override
diff --git a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HDFSSynchronizationTest.java b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HDFSSynchronizationTest.java
index 48ac9b6..2a9236b 100644
--- a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HDFSSynchronizationTest.java
+++ b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HDFSSynchronizationTest.java
@@ -40,8 +40,7 @@ import org.mockito.Mockito;
/** Tests functionality of {@link HDFSSynchronization} class. */
@RunWith(JUnit4.class)
public class HDFSSynchronizationTest {
-
- public static final String DEFAULT_JOB_ID = String.valueOf(1);
+ private static final String DEFAULT_JOB_ID = String.valueOf(1);
@Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
private HDFSSynchronization tested;
@@ -79,9 +78,7 @@ public class HDFSSynchronizationTest {
/** Missing job id in configuration will throw exception. */
@Test(expected = NullPointerException.class)
public void testMissingJobId() {
-
Configuration conf = new Configuration();
-
tested.tryAcquireJobLock(conf);
}
@@ -105,7 +102,6 @@ public class HDFSSynchronizationTest {
@Test
public void testTaskIdLockAcquire() {
-
int tasksCount = 100;
for (int i = 0; i < tasksCount; i++) {
TaskID taskID = tested.acquireTaskIdLock(configuration);
@@ -132,7 +128,6 @@ public class HDFSSynchronizationTest {
@Test
public void testCatchingRemoteException() throws IOException {
-
FileSystem mockedFileSystem = Mockito.mock(FileSystem.class);
RemoteException thrownException =
new RemoteException(AlreadyBeingCreatedException.class.getName(), "Failed to CREATE_FILE");
diff --git a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOCassandraTest.java b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOCassandraTest.java
index 2e638dc..22161c7 100644
--- a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOCassandraTest.java
+++ b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOCassandraTest.java
@@ -76,8 +76,6 @@ public class HadoopFormatIOCassandraTest implements Serializable {
/**
* Test to read data from embedded Cassandra instance and verify whether data is read
* successfully.
- *
- * @throws Exception
*/
@Test
public void testHIFReadForCassandra() {
diff --git a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOElasticTest.java b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOElasticTest.java
index 2f49652..7341f71 100644
--- a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOElasticTest.java
+++ b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOElasticTest.java
@@ -237,11 +237,7 @@ public class HadoopFormatIOElasticTest implements Serializable {
}
node.client().admin().indices().prepareRefresh(ELASTIC_INDEX_NAME).get();
}
- /**
- * Shutdown the embedded instance.
- *
- * @throws IOException
- */
+ /** Shutdown the embedded instance. */
static void shutdown() throws IOException {
DeleteIndexRequest indexRequest = new DeleteIndexRequest(ELASTIC_INDEX_NAME);
node.client().admin().indices().delete(indexRequest).actionGet();
diff --git a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOIT.java b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOIT.java
index ccaa1b1..4e2f371 100644
--- a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOIT.java
+++ b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOIT.java
@@ -260,7 +260,7 @@ public class HadoopFormatIOIT {
* Uses the input {@link TestRow} values as seeds to produce new {@link KV}s for {@link
* HadoopFormatIO}.
*/
- public static class ConstructDBOutputFormatRowFn
+ static class ConstructDBOutputFormatRowFn
extends DoFn<TestRow, KV<TestRowDBWritable, NullWritable>> {
@ProcessElement
public void processElement(ProcessContext c) {
diff --git a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOReadTest.java b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOReadTest.java
index 8d3c340..7eacab6 100644
--- a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOReadTest.java
+++ b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOReadTest.java
@@ -130,9 +130,6 @@ public class HadoopFormatIOReadTest {
* This test validates {@link HadoopFormatIO.Read Read} object creation if {@link
* HadoopFormatIO.Read#withConfiguration(Configuration) withConfiguration(Configuration)} is
* called more than once.
- *
- * @throws InterruptedException
- * @throws IOException
*/
@Test
public void testReadBuildsCorrectlyIfWithConfigurationIsCalledMoreThanOneTime() {
@@ -780,7 +777,6 @@ public class HadoopFormatIOReadTest {
AvroCoder.of(Employee.class));
for (BoundedSource<KV<Text, Employee>> source : boundedSourceList) {
// Cast to HadoopInputFormatBoundedSource to access getInputFormat().
- @SuppressWarnings("unchecked")
HadoopInputFormatBoundedSource<Text, Employee> hifSource =
(HadoopInputFormatBoundedSource<Text, Employee>) source;
hifSource.createInputFormatInstance();
diff --git a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOSequenceFileTest.java b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOSequenceFileTest.java
index 4802aae..89a5c5b 100644
--- a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOSequenceFileTest.java
+++ b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOSequenceFileTest.java
@@ -298,10 +298,7 @@ public class HadoopFormatIOSequenceFileTest {
private Map<String, Long> loadWrittenDataAsMap(String outputDirPath) {
return loadWrittenData(outputDirPath).stream()
.collect(
- Collectors.toMap(
- kv -> kv.getKey().toString(),
- kv -> kv.getValue().get(),
- (first, second) -> first + second));
+ Collectors.toMap(kv -> kv.getKey().toString(), kv -> kv.getValue().get(), Long::sum));
}
private <T> TimestampedValue<T> event(T eventValue, Long timestamp) {
@@ -311,7 +308,7 @@ public class HadoopFormatIOSequenceFileTest {
private static class ConvertToHadoopFormatFn<InputT, OutputT> extends DoFn<InputT, OutputT> {
- private SerializableFunction<InputT, OutputT> transformFn;
+ private final SerializableFunction<InputT, OutputT> transformFn;
ConvertToHadoopFormatFn(SerializableFunction<InputT, OutputT> transformFn) {
this.transformFn = transformFn;
@@ -328,19 +325,14 @@ public class HadoopFormatIOSequenceFileTest {
public void processElement(@DoFn.Element String element, OutputReceiver<String> receiver) {
receiver.output(element.toLowerCase());
}
-
- @Override
- public TypeDescriptor<String> getOutputTypeDescriptor() {
- return super.getOutputTypeDescriptor();
- }
}
private static class ConfigTransform<KeyT, ValueT>
extends PTransform<PCollection<? extends KV<KeyT, ValueT>>, PCollectionView<Configuration>> {
- private String outputDirPath;
- private Class<?> keyClass;
- private Class<?> valueClass;
+ private final String outputDirPath;
+ private final Class<?> keyClass;
+ private final Class<?> valueClass;
private int windowNum = 0;
private ConfigTransform(String outputDirPath, Class<?> keyClass, Class<?> valueClass) {
diff --git a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/ReuseObjectsEmployeeInputFormat.java b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/ReuseObjectsEmployeeInputFormat.java
index 11f14d4..1e7a8e2 100644
--- a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/ReuseObjectsEmployeeInputFormat.java
+++ b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/ReuseObjectsEmployeeInputFormat.java
@@ -36,7 +36,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
* This is a valid InputFormat for reading employee data which is available in the form of {@code
* List<KV>} as {@linkplain ReuseObjectsEmployeeRecordReader#employeeDataList employeeDataList}.
* {@linkplain ReuseObjectsEmployeeRecordReader#employeeDataList employeeDataList} is populated
- * using {@linkplain TestEmployeeDataSet#populateEmployeeDataNew()}.
+ * using {@linkplain TestEmployeeDataSet#populateEmployeeData()}.
*
* <p>{@linkplain ReuseObjectsEmployeeInputFormat} splits data into {@value
* TestEmployeeDataSet#NUMBER_OF_SPLITS} splits, each split having {@value
diff --git a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/TestEmployeeDataSet.java b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/TestEmployeeDataSet.java
index 98db521..6e19f17 100644
--- a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/TestEmployeeDataSet.java
+++ b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/TestEmployeeDataSet.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.io.Text;
* Test Utils used in {@link EmployeeInputFormat} and {@link ReuseObjectsEmployeeInputFormat} for
* computing splits.
*/
-public class TestEmployeeDataSet {
+class TestEmployeeDataSet {
public static final long NUMBER_OF_RECORDS_IN_EACH_SPLIT = 5L;
public static final long NUMBER_OF_SPLITS = 3L;
private static final List<KV<String, String>> data = new ArrayList<>();
diff --git a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/TestRowDBWritable.java b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/TestRowDBWritable.java
index cfa4e0b..f4e3677 100644
--- a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/TestRowDBWritable.java
+++ b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/TestRowDBWritable.java
@@ -35,7 +35,7 @@ import org.apache.hadoop.mapreduce.lib.db.DBWritable;
* org.apache.hadoop.mapreduce.lib.db.DBInputFormat}.
*/
@DefaultCoder(AvroCoder.class)
-public class TestRowDBWritable extends TestRow implements DBWritable, Writable {
+class TestRowDBWritable extends TestRow implements DBWritable, Writable {
private Integer id;
private String name;
@@ -81,7 +81,8 @@ public class TestRowDBWritable extends TestRow implements DBWritable, Writable {
name = in.readUTF();
}
- static class PrepareStatementFromTestRow implements JdbcIO.PreparedStatementSetter<TestRow> {
+ private static class PrepareStatementFromTestRow
+ implements JdbcIO.PreparedStatementSetter<TestRow> {
@Override
public void setParameters(TestRow element, PreparedStatement statement) throws SQLException {
statement.setLong(1, element.id());