You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ar...@apache.org on 2019/05/20 08:53:09 UTC

[beam] branch master updated: [BEAM-7359] Fix static analysis issues for HadoopFormatIO (#8616)

This is an automated email from the ASF dual-hosted git repository.

aromanenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new a06b7a6  [BEAM-7359] Fix static analysis issues for HadoopFormatIO (#8616)
a06b7a6 is described below

commit a06b7a6cbad2386b7bee37d3be0d7f3f5b8bfaff
Author: Ismaël Mejía <ie...@gmail.com>
AuthorDate: Mon May 20 10:52:52 2019 +0200

    [BEAM-7359] Fix static analysis issues for HadoopFormatIO (#8616)
---
 .../sdk/io/hadoop/format/HDFSSynchronization.java  |   2 +-
 .../beam/sdk/io/hadoop/format/HadoopFormatIO.java  | 122 +++++++++------------
 .../beam/sdk/io/hadoop/format/HadoopFormats.java   |   4 +-
 .../format/ConfigurableEmployeeInputFormat.java    |   2 +-
 .../apache/beam/sdk/io/hadoop/format/Employee.java |  10 +-
 .../io/hadoop/format/HDFSSynchronizationTest.java  |   7 +-
 .../hadoop/format/HadoopFormatIOCassandraTest.java |   2 -
 .../hadoop/format/HadoopFormatIOElasticTest.java   |   6 +-
 .../sdk/io/hadoop/format/HadoopFormatIOIT.java     |   2 +-
 .../io/hadoop/format/HadoopFormatIOReadTest.java   |   4 -
 .../format/HadoopFormatIOSequenceFileTest.java     |  18 +--
 .../format/ReuseObjectsEmployeeInputFormat.java    |   2 +-
 .../sdk/io/hadoop/format/TestEmployeeDataSet.java  |   2 +-
 .../sdk/io/hadoop/format/TestRowDBWritable.java    |   5 +-
 14 files changed, 72 insertions(+), 116 deletions(-)

diff --git a/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HDFSSynchronization.java b/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HDFSSynchronization.java
index 5a73ea1..60f60b2 100644
--- a/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HDFSSynchronization.java
+++ b/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HDFSSynchronization.java
@@ -180,7 +180,7 @@ public class HDFSSynchronization implements ExternalSynchronization {
    * @param <X> exception type
    */
   @FunctionalInterface
-  public interface ThrowingFunction<T1, T2, X extends Exception> extends Serializable {
+  interface ThrowingFunction<T1, T2, X extends Exception> extends Serializable {
     T2 apply(T1 value) throws X;
   }
 }
diff --git a/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIO.java b/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIO.java
index 8819024..b39ec80 100644
--- a/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIO.java
+++ b/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIO.java
@@ -226,7 +226,7 @@ import org.slf4j.LoggerFactory;
  *   <li>{@code mapreduce.job.output.value.class}: The value class passed to the {@link
  *       OutputFormat} in {@code mapreduce.job.outputformat.class}.
  *   <li>{@code mapreduce.job.reduces}: Number of reduce tasks. Value is equal to number of write
- *       tasks which will be genarated. This property is not required for {@link
+ *       tasks which will be generated. This property is not required for {@link
  *       Write.PartitionedWriterBuilder#withoutPartitioning()} write.
  *   <li>{@code mapreduce.job.partitioner.class}: Hadoop partitioner class which will be used for
  *       distributing of records among partitions. This property is not required for {@link
@@ -296,7 +296,7 @@ import org.slf4j.LoggerFactory;
  */
 @Experimental(Experimental.Kind.SOURCE_SINK)
 public class HadoopFormatIO {
-  private static final Logger LOGGER = LoggerFactory.getLogger(HadoopFormatIO.class);
+  private static final Logger LOG = LoggerFactory.getLogger(HadoopFormatIO.class);
 
   /** {@link MRJobConfig#OUTPUT_FORMAT_CLASS_ATTR}. */
   public static final String OUTPUT_FORMAT_CLASS_ATTR = MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR;
@@ -439,7 +439,7 @@ public class HadoopFormatIO {
       // Sets key class to key translation function's output class type.
       return toBuilder()
           .setKeyTranslationFunction(function)
-          .setKeyTypeDescriptor((TypeDescriptor<K>) function.getOutputTypeDescriptor())
+          .setKeyTypeDescriptor(function.getOutputTypeDescriptor())
           .build();
     }
 
@@ -449,7 +449,7 @@ public class HadoopFormatIO {
       // Sets value class to value translation function's output class type.
       return toBuilder()
           .setValueTranslationFunction(function)
-          .setValueTypeDescriptor((TypeDescriptor<V>) function.getOutputTypeDescriptor())
+          .setValueTypeDescriptor(function.getOutputTypeDescriptor())
           .build();
     }
 
@@ -633,25 +633,24 @@ public class HadoopFormatIO {
       // desiredBundleSizeBytes is not being considered as splitting based on this
       // value is not supported by inputFormat getSplits() method.
       if (inputSplit != null) {
-        LOGGER.info("Not splitting source {} because source is already split.", this);
-        return ImmutableList.of((BoundedSource<KV<K, V>>) this);
+        LOG.info("Not splitting source {} because source is already split.", this);
+        return ImmutableList.of(this);
       }
       computeSplitsIfNecessary();
-      LOGGER.info(
+      LOG.info(
           "Generated {} splits. Size of first split is {} ",
           inputSplits.size(),
           inputSplits.get(0).getSplit().getLength());
       return inputSplits.stream()
           .map(
-              serializableInputSplit -> {
-                return new HadoopInputFormatBoundedSource<>(
-                    conf,
-                    keyCoder,
-                    valueCoder,
-                    keyTranslationFunction,
-                    valueTranslationFunction,
-                    serializableInputSplit);
-              })
+              serializableInputSplit ->
+                  new HadoopInputFormatBoundedSource<>(
+                      conf,
+                      keyCoder,
+                      valueCoder,
+                      keyTranslationFunction,
+                      valueTranslationFunction,
+                      serializableInputSplit))
           .collect(Collectors.toList());
     }
 
@@ -808,9 +807,7 @@ public class HadoopFormatIO {
       public boolean start() throws IOException {
         try {
           recordsReturned.set(0L);
-          recordReader =
-              (RecordReader<T1, T2>)
-                  inputFormatObj.createRecordReader(split.getSplit(), taskAttemptContext);
+          recordReader = inputFormatObj.createRecordReader(split.getSplit(), taskAttemptContext);
           if (recordReader != null) {
             recordReader.initialize(split.getSplit(), taskAttemptContext);
             progressValue.set(getProgress());
@@ -852,30 +849,23 @@ public class HadoopFormatIO {
 
       @Override
       public KV<K, V> getCurrent() {
-        K key = null;
-        V value = null;
+        K key;
+        V value;
         try {
           // Transform key if translation function is provided.
-          key =
-              transformKeyOrValue(
-                  (T1) recordReader.getCurrentKey(), keyTranslationFunction, keyCoder);
+          key = transformKeyOrValue(recordReader.getCurrentKey(), keyTranslationFunction, keyCoder);
           // Transform value if translation function is provided.
           value =
               transformKeyOrValue(
-                  (T2) recordReader.getCurrentValue(), valueTranslationFunction, valueCoder);
+                  recordReader.getCurrentValue(), valueTranslationFunction, valueCoder);
         } catch (IOException | InterruptedException e) {
-          LOGGER.error("Unable to read data: " + "{}", e);
+          LOG.error("Unable to read data: ", e);
           throw new IllegalStateException("Unable to read data: " + "{}", e);
         }
         return KV.of(key, value);
       }
 
-      /**
-       * Returns the serialized output of transformed key or value object.
-       *
-       * @throws ClassCastException
-       * @throws CoderException
-       */
+      /** Returns the serialized output of transformed key or value object. */
       @SuppressWarnings("unchecked")
       private <T, T3> T3 transformKeyOrValue(
           T input, @Nullable SimpleFunction<T, T3> simpleFunction, Coder<T3> coder)
@@ -886,7 +876,7 @@ public class HadoopFormatIO {
         } else {
           output = (T3) input;
         }
-        return cloneIfPossiblyMutable((T3) output, coder);
+        return cloneIfPossiblyMutable(output, coder);
       }
 
       /**
@@ -909,7 +899,7 @@ public class HadoopFormatIO {
 
       @Override
       public void close() throws IOException {
-        LOGGER.info("Closing reader after reading {} records.", recordsReturned);
+        LOG.info("Closing reader after reading {} records.", recordsReturned);
         if (recordReader != null) {
           recordReader.close();
           recordReader = null;
@@ -929,21 +919,15 @@ public class HadoopFormatIO {
         return progressValue.doubleValue();
       }
 
-      /**
-       * Returns RecordReader's progress.
-       *
-       * @throws IOException
-       * @throws InterruptedException
-       */
+      /** Returns RecordReader's progress. */
       private Double getProgress() throws IOException, InterruptedException {
         try {
           float progress = recordReader.getProgress();
           return (double) progress < 0 || progress > 1 ? 0.0 : progress;
         } catch (IOException e) {
-          LOGGER.error(
+          LOG.error(
               "Error in computing the fractions consumed as RecordReader.getProgress() throws an "
-                  + "exception : "
-                  + "{}",
+                  + "exception : ",
               e);
           throw new IOException(
               "Error in computing the fractions consumed as RecordReader.getProgress() throws an "
@@ -1012,7 +996,7 @@ public class HadoopFormatIO {
       extends PTransform<
           PCollection<KV<KeyT, ValueT>>, PCollection<KV<Integer, KV<KeyT, ValueT>>>> {
 
-    private PCollectionView<Configuration> configView;
+    private final PCollectionView<Configuration> configView;
 
     private GroupDataByPartition(PCollectionView<Configuration> configView) {
       this.configView = configView;
@@ -1063,17 +1047,18 @@ public class HadoopFormatIO {
    */
   public static class Write<KeyT, ValueT> extends PTransform<PCollection<KV<KeyT, ValueT>>, PDone> {
 
-    @Nullable private transient Configuration configuration;
+    @Nullable private final transient Configuration configuration;
 
     @Nullable
-    private PTransform<PCollection<? extends KV<KeyT, ValueT>>, PCollectionView<Configuration>>
+    private final PTransform<
+            PCollection<? extends KV<KeyT, ValueT>>, PCollectionView<Configuration>>
         configTransform;
 
-    private ExternalSynchronization externalSynchronization;
+    private final ExternalSynchronization externalSynchronization;
 
-    private boolean withPartitioning;
+    private final boolean withPartitioning;
 
-    public Write(
+    Write(
         @Nullable Configuration configuration,
         @Nullable
             PTransform<PCollection<? extends KV<KeyT, ValueT>>, PCollectionView<Configuration>>
@@ -1101,7 +1086,7 @@ public class HadoopFormatIO {
        *
        * <ul>
        *   <li>{@code mapreduce.job.reduces}: Number of reduce tasks. Value is equal to number of
-       *       write tasks which will be genarated.
+       *       write tasks which will be generated.
        *   <li>{@code mapreduce.job.partitioner.class}: Hadoop partitioner class which will be used
        *       for distributing of records among partitions.
        * </ul>
@@ -1200,7 +1185,6 @@ public class HadoopFormatIO {
       private Configuration configuration;
       private PTransform<PCollection<? extends KV<KeyT, ValueT>>, PCollectionView<Configuration>>
           configTransform;
-      private ExternalSynchronization externalSynchronization;
       private boolean isWithPartitioning;
 
       @Override
@@ -1236,11 +1220,10 @@ public class HadoopFormatIO {
       public Write<KeyT, ValueT> withExternalSynchronization(
           ExternalSynchronization externalSynchronization) {
         checkNotNull(externalSynchronization, "External synchronization cannot be null");
-        this.externalSynchronization = externalSynchronization;
         return new Write<>(
             this.configuration,
             this.configTransform,
-            this.externalSynchronization,
+            externalSynchronization,
             this.isWithPartitioning);
       }
     }
@@ -1415,14 +1398,13 @@ public class HadoopFormatIO {
    */
   private static class TaskContext<KeyT, ValueT> {
 
-    private RecordWriter<KeyT, ValueT> recordWriter;
-    private OutputCommitter outputCommitter;
-    private OutputFormat<KeyT, ValueT> outputFormatObj;
-    private TaskAttemptContext taskAttemptContext;
+    private final RecordWriter<KeyT, ValueT> recordWriter;
+    private final OutputCommitter outputCommitter;
+    private final TaskAttemptContext taskAttemptContext;
 
     TaskContext(TaskAttemptID taskAttempt, Configuration conf) {
       taskAttemptContext = HadoopFormats.createTaskAttemptContext(conf, taskAttempt);
-      outputFormatObj = HadoopFormats.createOutputFormatFromConfig(conf);
+      OutputFormat<KeyT, ValueT> outputFormatObj = HadoopFormats.createOutputFormatFromConfig(conf);
       outputCommitter = initOutputCommitter(outputFormatObj, conf, taskAttemptContext);
       recordWriter = initRecordWriter(outputFormatObj, taskAttemptContext);
     }
@@ -1460,7 +1442,7 @@ public class HadoopFormatIO {
         OutputFormat<KeyT, ValueT> outputFormatObj, TaskAttemptContext taskAttemptContext)
         throws IllegalStateException {
       try {
-        LOGGER.info(
+        LOG.info(
             "Creating new RecordWriter for task {} of Job with id {}.",
             taskAttemptContext.getTaskAttemptID().getTaskID().getId(),
             taskAttemptContext.getJobID().getJtIdentifier());
@@ -1535,9 +1517,9 @@ public class HadoopFormatIO {
    */
   private static class SetupJobFn<KeyT, ValueT> extends DoFn<KV<KeyT, ValueT>, KV<KeyT, ValueT>> {
 
-    private ExternalSynchronization externalSynchronization;
-    private PCollectionView<Configuration> configView;
-    private TypeDescriptor<KV<KeyT, ValueT>> inputTypeDescriptor;
+    private final ExternalSynchronization externalSynchronization;
+    private final PCollectionView<Configuration> configView;
+    private final TypeDescriptor<KV<KeyT, ValueT>> inputTypeDescriptor;
     private boolean isSetupJobAttempted;
 
     SetupJobFn(
@@ -1666,13 +1648,13 @@ public class HadoopFormatIO {
         jobOutputFormat.checkOutputSpecs(setupTaskContext);
         jobOutputFormat.getOutputCommitter(setupTaskContext).setupJob(setupTaskContext);
 
-        LOGGER.info(
+        LOG.info(
             "Job with id {} successfully configured from window with max timestamp {}.",
             jobId.getJtIdentifier(),
             window.maxTimestamp());
 
       } catch (FileAlreadyExistsException e) {
-        LOGGER.info("Job was already set by other worker. Skipping rest of the setup.");
+        LOG.info("Job was already set by other worker. Skipping rest of the setup.");
       } catch (Exception e) {
         throw new RuntimeException("Unable to setup job.", e);
       }
@@ -1686,8 +1668,8 @@ public class HadoopFormatIO {
    */
   private static class CommitJobFn<T> extends DoFn<Iterable<T>, Void> {
 
-    private PCollectionView<Configuration> configView;
-    private ExternalSynchronization externalSynchronization;
+    private final PCollectionView<Configuration> configView;
+    private final ExternalSynchronization externalSynchronization;
 
     CommitJobFn(
         PCollectionView<Configuration> configView,
@@ -1734,7 +1716,7 @@ public class HadoopFormatIO {
   private static class AssignTaskFn<KeyT, ValueT>
       extends DoFn<KV<KeyT, ValueT>, KV<Integer, KV<KeyT, ValueT>>> {
 
-    private PCollectionView<Configuration> configView;
+    private final PCollectionView<Configuration> configView;
 
     // Transient properties because they are used only for one bundle
     /** Cache of created TaskIDs for given bundle. */
@@ -1879,7 +1861,7 @@ public class HadoopFormatIO {
           taskContext.getRecordWriter().close(taskContext.getTaskAttemptContext());
           taskContext.getOutputCommitter().commitTask(taskContext.getTaskAttemptContext());
 
-          LOGGER.info("Write task for {} was successfully committed!", taskContext);
+          LOG.info("Write task for {} was successfully committed!", taskContext);
         } catch (Exception e) {
           processTaskException(taskContext, e);
         }
@@ -1890,7 +1872,7 @@ public class HadoopFormatIO {
     }
 
     private void processTaskException(TaskContext<KeyT, ValueT> taskContext, Exception e) {
-      LOGGER.warn("Write task for {} failed. Will abort task.", taskContext);
+      LOG.warn("Write task for {} failed. Will abort task.", taskContext);
       taskContext.abortTask();
       throw new IllegalArgumentException(e);
     }
@@ -1933,7 +1915,7 @@ public class HadoopFormatIO {
         processTaskException(taskContext, e);
       }
 
-      LOGGER.info(
+      LOG.info(
           "Task with id {} of job {} was successfully setup!",
           taskId,
           HadoopFormats.getJobId(conf).getJtIdentifier());
diff --git a/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormats.java b/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormats.java
index 61d9d5b..4386fb4 100644
--- a/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormats.java
+++ b/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormats.java
@@ -38,7 +38,7 @@ final class HadoopFormats {
 
   private static final int DEFAULT_JOB_NUMBER = 0;
   static final Class<HashPartitioner> DEFAULT_PARTITIONER_CLASS_ATTR = HashPartitioner.class;
-  static final int DEFAULT_NUM_REDUCERS = 1;
+  private static final int DEFAULT_NUM_REDUCERS = 1;
 
   private HadoopFormats() {}
 
@@ -92,7 +92,7 @@ final class HadoopFormats {
   /**
    * Creates {@link TaskAttemptContext}.
    *
-   * @param conf cofniguration
+   * @param conf configuration
    * @param taskAttemptID taskAttemptId
    * @return new {@link TaskAttemptContext}
    */
diff --git a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/ConfigurableEmployeeInputFormat.java b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/ConfigurableEmployeeInputFormat.java
index 0846a96..5e5640b 100644
--- a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/ConfigurableEmployeeInputFormat.java
+++ b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/ConfigurableEmployeeInputFormat.java
@@ -61,7 +61,7 @@ class ConfigurableEmployeeInputFormat extends InputFormat<Text, Employee> implem
 
   /**
    * Returns InputSPlit list of {@link ConfigurableEmployeeInputSplit}. Throws exception if {@link
-   * #setConf()} is not called.
+   * #setConf(Configuration)} is not called.
    */
   @Override
   public List<InputSplit> getSplits(JobContext context) throws IOException {
diff --git a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/Employee.java b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/Employee.java
index 49b5938..54cd47b 100644
--- a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/Employee.java
+++ b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/Employee.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.io.hadoop.format;
 
+import java.util.Objects;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.DefaultCoder;
 
@@ -64,15 +65,10 @@ public class Employee {
 
     Employee employeePojo = (Employee) o;
 
-    if (empName != null ? !empName.equals(employeePojo.empName) : employeePojo.empName != null) {
+    if (!Objects.equals(empName, employeePojo.empName)) {
       return false;
     }
-    if (empAddress != null
-        ? !empAddress.equals(employeePojo.empAddress)
-        : employeePojo.empAddress != null) {
-      return false;
-    }
-    return true;
+    return Objects.equals(empAddress, employeePojo.empAddress);
   }
 
   @Override
diff --git a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HDFSSynchronizationTest.java b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HDFSSynchronizationTest.java
index 48ac9b6..2a9236b 100644
--- a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HDFSSynchronizationTest.java
+++ b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HDFSSynchronizationTest.java
@@ -40,8 +40,7 @@ import org.mockito.Mockito;
 /** Tests functionality of {@link HDFSSynchronization} class. */
 @RunWith(JUnit4.class)
 public class HDFSSynchronizationTest {
-
-  public static final String DEFAULT_JOB_ID = String.valueOf(1);
+  private static final String DEFAULT_JOB_ID = String.valueOf(1);
   @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
 
   private HDFSSynchronization tested;
@@ -79,9 +78,7 @@ public class HDFSSynchronizationTest {
   /** Missing job id in configuration will throw exception. */
   @Test(expected = NullPointerException.class)
   public void testMissingJobId() {
-
     Configuration conf = new Configuration();
-
     tested.tryAcquireJobLock(conf);
   }
 
@@ -105,7 +102,6 @@ public class HDFSSynchronizationTest {
 
   @Test
   public void testTaskIdLockAcquire() {
-
     int tasksCount = 100;
     for (int i = 0; i < tasksCount; i++) {
       TaskID taskID = tested.acquireTaskIdLock(configuration);
@@ -132,7 +128,6 @@ public class HDFSSynchronizationTest {
 
   @Test
   public void testCatchingRemoteException() throws IOException {
-
     FileSystem mockedFileSystem = Mockito.mock(FileSystem.class);
     RemoteException thrownException =
         new RemoteException(AlreadyBeingCreatedException.class.getName(), "Failed to CREATE_FILE");
diff --git a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOCassandraTest.java b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOCassandraTest.java
index 2e638dc..22161c7 100644
--- a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOCassandraTest.java
+++ b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOCassandraTest.java
@@ -76,8 +76,6 @@ public class HadoopFormatIOCassandraTest implements Serializable {
   /**
    * Test to read data from embedded Cassandra instance and verify whether data is read
    * successfully.
-   *
-   * @throws Exception
    */
   @Test
   public void testHIFReadForCassandra() {
diff --git a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOElasticTest.java b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOElasticTest.java
index 2f49652..7341f71 100644
--- a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOElasticTest.java
+++ b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOElasticTest.java
@@ -237,11 +237,7 @@ public class HadoopFormatIOElasticTest implements Serializable {
       }
       node.client().admin().indices().prepareRefresh(ELASTIC_INDEX_NAME).get();
     }
-    /**
-     * Shutdown the embedded instance.
-     *
-     * @throws IOException
-     */
+    /** Shutdown the embedded instance. */
     static void shutdown() throws IOException {
       DeleteIndexRequest indexRequest = new DeleteIndexRequest(ELASTIC_INDEX_NAME);
       node.client().admin().indices().delete(indexRequest).actionGet();
diff --git a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOIT.java b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOIT.java
index ccaa1b1..4e2f371 100644
--- a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOIT.java
+++ b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOIT.java
@@ -260,7 +260,7 @@ public class HadoopFormatIOIT {
    * Uses the input {@link TestRow} values as seeds to produce new {@link KV}s for {@link
    * HadoopFormatIO}.
    */
-  public static class ConstructDBOutputFormatRowFn
+  static class ConstructDBOutputFormatRowFn
       extends DoFn<TestRow, KV<TestRowDBWritable, NullWritable>> {
     @ProcessElement
     public void processElement(ProcessContext c) {
diff --git a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOReadTest.java b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOReadTest.java
index 8d3c340..7eacab6 100644
--- a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOReadTest.java
+++ b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOReadTest.java
@@ -130,9 +130,6 @@ public class HadoopFormatIOReadTest {
    * This test validates {@link HadoopFormatIO.Read Read} object creation if {@link
    * HadoopFormatIO.Read#withConfiguration(Configuration) withConfiguration(Configuration)} is
    * called more than once.
-   *
-   * @throws InterruptedException
-   * @throws IOException
    */
   @Test
   public void testReadBuildsCorrectlyIfWithConfigurationIsCalledMoreThanOneTime() {
@@ -780,7 +777,6 @@ public class HadoopFormatIOReadTest {
             AvroCoder.of(Employee.class));
     for (BoundedSource<KV<Text, Employee>> source : boundedSourceList) {
       // Cast to HadoopInputFormatBoundedSource to access getInputFormat().
-      @SuppressWarnings("unchecked")
       HadoopInputFormatBoundedSource<Text, Employee> hifSource =
           (HadoopInputFormatBoundedSource<Text, Employee>) source;
       hifSource.createInputFormatInstance();
diff --git a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOSequenceFileTest.java b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOSequenceFileTest.java
index 4802aae..89a5c5b 100644
--- a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOSequenceFileTest.java
+++ b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOSequenceFileTest.java
@@ -298,10 +298,7 @@ public class HadoopFormatIOSequenceFileTest {
   private Map<String, Long> loadWrittenDataAsMap(String outputDirPath) {
     return loadWrittenData(outputDirPath).stream()
         .collect(
-            Collectors.toMap(
-                kv -> kv.getKey().toString(),
-                kv -> kv.getValue().get(),
-                (first, second) -> first + second));
+            Collectors.toMap(kv -> kv.getKey().toString(), kv -> kv.getValue().get(), Long::sum));
   }
 
   private <T> TimestampedValue<T> event(T eventValue, Long timestamp) {
@@ -311,7 +308,7 @@ public class HadoopFormatIOSequenceFileTest {
 
   private static class ConvertToHadoopFormatFn<InputT, OutputT> extends DoFn<InputT, OutputT> {
 
-    private SerializableFunction<InputT, OutputT> transformFn;
+    private final SerializableFunction<InputT, OutputT> transformFn;
 
     ConvertToHadoopFormatFn(SerializableFunction<InputT, OutputT> transformFn) {
       this.transformFn = transformFn;
@@ -328,19 +325,14 @@ public class HadoopFormatIOSequenceFileTest {
     public void processElement(@DoFn.Element String element, OutputReceiver<String> receiver) {
       receiver.output(element.toLowerCase());
     }
-
-    @Override
-    public TypeDescriptor<String> getOutputTypeDescriptor() {
-      return super.getOutputTypeDescriptor();
-    }
   }
 
   private static class ConfigTransform<KeyT, ValueT>
       extends PTransform<PCollection<? extends KV<KeyT, ValueT>>, PCollectionView<Configuration>> {
 
-    private String outputDirPath;
-    private Class<?> keyClass;
-    private Class<?> valueClass;
+    private final String outputDirPath;
+    private final Class<?> keyClass;
+    private final Class<?> valueClass;
     private int windowNum = 0;
 
     private ConfigTransform(String outputDirPath, Class<?> keyClass, Class<?> valueClass) {
diff --git a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/ReuseObjectsEmployeeInputFormat.java b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/ReuseObjectsEmployeeInputFormat.java
index 11f14d4..1e7a8e2 100644
--- a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/ReuseObjectsEmployeeInputFormat.java
+++ b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/ReuseObjectsEmployeeInputFormat.java
@@ -36,7 +36,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
  * This is a valid InputFormat for reading employee data which is available in the form of {@code
  * List<KV>} as {@linkplain ReuseObjectsEmployeeRecordReader#employeeDataList employeeDataList}.
  * {@linkplain ReuseObjectsEmployeeRecordReader#employeeDataList employeeDataList} is populated
- * using {@linkplain TestEmployeeDataSet#populateEmployeeDataNew()}.
+ * using {@linkplain TestEmployeeDataSet#populateEmployeeData()}.
  *
  * <p>{@linkplain ReuseObjectsEmployeeInputFormat} splits data into {@value
  * TestEmployeeDataSet#NUMBER_OF_SPLITS} splits, each split having {@value
diff --git a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/TestEmployeeDataSet.java b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/TestEmployeeDataSet.java
index 98db521..6e19f17 100644
--- a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/TestEmployeeDataSet.java
+++ b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/TestEmployeeDataSet.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.io.Text;
  * Test Utils used in {@link EmployeeInputFormat} and {@link ReuseObjectsEmployeeInputFormat} for
  * computing splits.
  */
-public class TestEmployeeDataSet {
+class TestEmployeeDataSet {
   public static final long NUMBER_OF_RECORDS_IN_EACH_SPLIT = 5L;
   public static final long NUMBER_OF_SPLITS = 3L;
   private static final List<KV<String, String>> data = new ArrayList<>();
diff --git a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/TestRowDBWritable.java b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/TestRowDBWritable.java
index cfa4e0b..f4e3677 100644
--- a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/TestRowDBWritable.java
+++ b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/TestRowDBWritable.java
@@ -35,7 +35,7 @@ import org.apache.hadoop.mapreduce.lib.db.DBWritable;
  * org.apache.hadoop.mapreduce.lib.db.DBInputFormat}.
  */
 @DefaultCoder(AvroCoder.class)
-public class TestRowDBWritable extends TestRow implements DBWritable, Writable {
+class TestRowDBWritable extends TestRow implements DBWritable, Writable {
 
   private Integer id;
   private String name;
@@ -81,7 +81,8 @@ public class TestRowDBWritable extends TestRow implements DBWritable, Writable {
     name = in.readUTF();
   }
 
-  static class PrepareStatementFromTestRow implements JdbcIO.PreparedStatementSetter<TestRow> {
+  private static class PrepareStatementFromTestRow
+      implements JdbcIO.PreparedStatementSetter<TestRow> {
     @Override
     public void setParameters(TestRow element, PreparedStatement statement) throws SQLException {
       statement.setLong(1, element.id());