You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2020/04/27 19:50:47 UTC

[incubator-hudi] branch master updated: [HUDI-819] Fix a bug with MergeOnReadLazyInsertIterable.

This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6de9f5d  [HUDI-819] Fix a bug with MergeOnReadLazyInsertIterable.
6de9f5d is described below

commit 6de9f5d9e5cb1f82d7c32d04b114e7d4a181619b
Author: satishkotha <sa...@uber.com>
AuthorDate: Mon Apr 27 12:50:39 2020 -0700

    [HUDI-819] Fix a bug with MergeOnReadLazyInsertIterable.
    
    Variable declared here[1] masks protected statuses variable. So although hoodie writes data, will not include writestatus in the completed section. This can cause duplicates being written (#1540)
    [1] https://github.com/apache/incubator-hudi/blob/master/hudi-client/src/main/java/org/apache/hudi/execution/MergeOnReadLazyInsertIterable.java#L53
---
 .../hudi/execution/BulkInsertMapFunction.java      |  2 +-
 ...InsertIterable.java => LazyInsertIterable.java} | 35 +++++-----
 .../execution/MergeOnReadLazyInsertIterable.java   | 74 ----------------------
 .../org/apache/hudi/io/AppendHandleFactory.java    | 36 +++++++++++
 .../org/apache/hudi/io/CreateHandleFactory.java    | 36 +++++++++++
 .../org/apache/hudi/io/WriteHandleFactory.java     | 35 ++++++++++
 .../table/action/commit/CommitActionExecutor.java  |  4 +-
 .../deltacommit/DeltaCommitActionExecutor.java     |  7 +-
 .../execution/TestBoundedInMemoryExecutor.java     |  4 +-
 .../hudi/execution/TestBoundedInMemoryQueue.java   |  4 +-
 10 files changed, 138 insertions(+), 99 deletions(-)

diff --git a/hudi-client/src/main/java/org/apache/hudi/execution/BulkInsertMapFunction.java b/hudi-client/src/main/java/org/apache/hudi/execution/BulkInsertMapFunction.java
index 5d4391c..67c1d75 100644
--- a/hudi-client/src/main/java/org/apache/hudi/execution/BulkInsertMapFunction.java
+++ b/hudi-client/src/main/java/org/apache/hudi/execution/BulkInsertMapFunction.java
@@ -50,7 +50,7 @@ public class BulkInsertMapFunction<T extends HoodieRecordPayload>
 
   @Override
   public Iterator<List<WriteStatus>> call(Integer partition, Iterator<HoodieRecord<T>> sortedRecordItr) {
-    return new CopyOnWriteLazyInsertIterable<>(sortedRecordItr, config, instantTime, hoodieTable,
+    return new LazyInsertIterable<>(sortedRecordItr, config, instantTime, hoodieTable,
         fileIDPrefixes.get(partition), hoodieTable.getSparkTaskContextSupplier());
   }
 }
diff --git a/hudi-client/src/main/java/org/apache/hudi/execution/CopyOnWriteLazyInsertIterable.java b/hudi-client/src/main/java/org/apache/hudi/execution/LazyInsertIterable.java
similarity index 81%
rename from hudi-client/src/main/java/org/apache/hudi/execution/CopyOnWriteLazyInsertIterable.java
rename to hudi-client/src/main/java/org/apache/hudi/execution/LazyInsertIterable.java
index 8f98496..fe0d5c4 100644
--- a/hudi-client/src/main/java/org/apache/hudi/execution/CopyOnWriteLazyInsertIterable.java
+++ b/hudi-client/src/main/java/org/apache/hudi/execution/LazyInsertIterable.java
@@ -28,7 +28,8 @@ import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
 import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.io.HoodieCreateHandle;
+import org.apache.hudi.io.CreateHandleFactory;
+import org.apache.hudi.io.WriteHandleFactory;
 import org.apache.hudi.io.HoodieWriteHandle;
 import org.apache.hudi.table.HoodieTable;
 
@@ -43,26 +44,34 @@ import java.util.function.Function;
 /**
  * Lazy Iterable, that writes a stream of HoodieRecords sorted by the partitionPath, into new files.
  */
-public class CopyOnWriteLazyInsertIterable<T extends HoodieRecordPayload>
+public class LazyInsertIterable<T extends HoodieRecordPayload>
     extends LazyIterableIterator<HoodieRecord<T>, List<WriteStatus>> {
 
   protected final HoodieWriteConfig hoodieConfig;
   protected final String instantTime;
   protected final HoodieTable<T> hoodieTable;
   protected final String idPrefix;
-  protected int numFilesWritten;
   protected SparkTaskContextSupplier sparkTaskContextSupplier;
+  protected WriteHandleFactory<T> writeHandleFactory;
 
-  public CopyOnWriteLazyInsertIterable(Iterator<HoodieRecord<T>> sortedRecordItr, HoodieWriteConfig config,
-                                       String instantTime, HoodieTable<T> hoodieTable, String idPrefix,
-                                       SparkTaskContextSupplier sparkTaskContextSupplier) {
+  public LazyInsertIterable(Iterator<HoodieRecord<T>> sortedRecordItr, HoodieWriteConfig config,
+                            String instantTime, HoodieTable<T> hoodieTable, String idPrefix,
+                            SparkTaskContextSupplier sparkTaskContextSupplier) {
+    this(sortedRecordItr, config, instantTime, hoodieTable, idPrefix, sparkTaskContextSupplier,
+        new CreateHandleFactory<>());
+  }
+
+  public LazyInsertIterable(Iterator<HoodieRecord<T>> sortedRecordItr, HoodieWriteConfig config,
+                            String instantTime, HoodieTable<T> hoodieTable, String idPrefix,
+                            SparkTaskContextSupplier sparkTaskContextSupplier,
+                            WriteHandleFactory<T> writeHandleFactory) {
     super(sortedRecordItr);
     this.hoodieConfig = config;
     this.instantTime = instantTime;
     this.hoodieTable = hoodieTable;
     this.idPrefix = idPrefix;
-    this.numFilesWritten = 0;
     this.sparkTaskContextSupplier = sparkTaskContextSupplier;
+    this.writeHandleFactory = writeHandleFactory;
   }
 
   // Used for caching HoodieRecord along with insertValue. We need this to offload computation work to buffering thread.
@@ -118,10 +127,6 @@ public class CopyOnWriteLazyInsertIterable<T extends HoodieRecordPayload>
   @Override
   protected void end() {}
 
-  protected String getNextFileId(String idPfx) {
-    return String.format("%s-%d", idPfx, numFilesWritten++);
-  }
-
   protected CopyOnWriteInsertHandler getInsertHandler() {
     return new CopyOnWriteInsertHandler();
   }
@@ -140,8 +145,8 @@ public class CopyOnWriteLazyInsertIterable<T extends HoodieRecordPayload>
       final HoodieRecord insertPayload = payload.record;
       // lazily initialize the handle, for the first time
       if (handle == null) {
-        handle = new HoodieCreateHandle(hoodieConfig, instantTime, hoodieTable, insertPayload.getPartitionPath(),
-            getNextFileId(idPrefix), sparkTaskContextSupplier);
+        handle = writeHandleFactory.create(hoodieConfig, instantTime, hoodieTable, insertPayload.getPartitionPath(),
+            idPrefix, sparkTaskContextSupplier);
       }
 
       if (handle.canWrite(payload.record)) {
@@ -151,8 +156,8 @@ public class CopyOnWriteLazyInsertIterable<T extends HoodieRecordPayload>
         // handle is full.
         statuses.add(handle.close());
         // Need to handle the rejected payload & open new handle
-        handle = new HoodieCreateHandle(hoodieConfig, instantTime, hoodieTable, insertPayload.getPartitionPath(),
-            getNextFileId(idPrefix), sparkTaskContextSupplier);
+        handle = writeHandleFactory.create(hoodieConfig, instantTime, hoodieTable, insertPayload.getPartitionPath(),
+            idPrefix, sparkTaskContextSupplier);
         handle.write(insertPayload, payload.insertValue, payload.exception); // we should be able to write 1 payload.
       }
     }
diff --git a/hudi-client/src/main/java/org/apache/hudi/execution/MergeOnReadLazyInsertIterable.java b/hudi-client/src/main/java/org/apache/hudi/execution/MergeOnReadLazyInsertIterable.java
deleted file mode 100644
index 02a9ead..0000000
--- a/hudi-client/src/main/java/org/apache/hudi/execution/MergeOnReadLazyInsertIterable.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.execution;
-
-import org.apache.hudi.client.SparkTaskContextSupplier;
-import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.io.HoodieAppendHandle;
-import org.apache.hudi.table.HoodieTable;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-/**
- * Lazy Iterable, that writes a stream of HoodieRecords sorted by the partitionPath, into new log files.
- */
-public class MergeOnReadLazyInsertIterable<T extends HoodieRecordPayload> extends CopyOnWriteLazyInsertIterable<T> {
-
-  public MergeOnReadLazyInsertIterable(Iterator<HoodieRecord<T>> sortedRecordItr, HoodieWriteConfig config,
-      String instantTime, HoodieTable<T> hoodieTable, String idPfx, SparkTaskContextSupplier sparkTaskContextSupplier) {
-    super(sortedRecordItr, config, instantTime, hoodieTable, idPfx, sparkTaskContextSupplier);
-  }
-
-  @Override
-  protected CopyOnWriteInsertHandler getInsertHandler() {
-    return new MergeOnReadInsertHandler();
-  }
-
-  protected class MergeOnReadInsertHandler extends CopyOnWriteInsertHandler {
-
-    @Override
-    protected void consumeOneRecord(HoodieInsertValueGenResult<HoodieRecord> payload) {
-      final HoodieRecord insertPayload = payload.record;
-      List<WriteStatus> statuses = new ArrayList<>();
-      // lazily initialize the handle, for the first time
-      if (handle == null) {
-        handle = new HoodieAppendHandle(hoodieConfig, instantTime, hoodieTable,
-                insertPayload.getPartitionPath(), getNextFileId(idPrefix), sparkTaskContextSupplier);
-      }
-      if (handle.canWrite(insertPayload)) {
-        // write the payload, if the handle has capacity
-        handle.write(insertPayload, payload.insertValue, payload.exception);
-      } else {
-        // handle is full.
-        handle.close();
-        statuses.add(handle.getWriteStatus());
-        // Need to handle the rejected payload & open new handle
-        handle = new HoodieAppendHandle(hoodieConfig, instantTime, hoodieTable,
-                insertPayload.getPartitionPath(), getNextFileId(idPrefix), sparkTaskContextSupplier);
-        handle.write(insertPayload, payload.insertValue, payload.exception); // we should be able to write 1 payload.
-      }
-    }
-  }
-
-}
diff --git a/hudi-client/src/main/java/org/apache/hudi/io/AppendHandleFactory.java b/hudi-client/src/main/java/org/apache/hudi/io/AppendHandleFactory.java
new file mode 100644
index 0000000..4a5554b
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/io/AppendHandleFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.client.SparkTaskContextSupplier;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+public class AppendHandleFactory<T extends HoodieRecordPayload> extends WriteHandleFactory<T> {
+
+  @Override
+  public HoodieAppendHandle<T> create(final HoodieWriteConfig hoodieConfig, final String commitTime,
+                                     final HoodieTable<T> hoodieTable, final String partitionPath,
+                                     final String fileIdPrefix, final SparkTaskContextSupplier sparkTaskContextSupplier) {
+
+    return new HoodieAppendHandle(hoodieConfig, commitTime, hoodieTable, partitionPath,
+        getNextFileId(fileIdPrefix), sparkTaskContextSupplier);
+  }
+}
\ No newline at end of file
diff --git a/hudi-client/src/main/java/org/apache/hudi/io/CreateHandleFactory.java b/hudi-client/src/main/java/org/apache/hudi/io/CreateHandleFactory.java
new file mode 100644
index 0000000..68d8b4d
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/io/CreateHandleFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.client.SparkTaskContextSupplier;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+public class CreateHandleFactory<T extends HoodieRecordPayload> extends WriteHandleFactory<T> {
+
+  @Override
+  public HoodieWriteHandle<T> create(final HoodieWriteConfig hoodieConfig, final String commitTime,
+                                     final HoodieTable<T> hoodieTable, final String partitionPath,
+                                     final String fileIdPrefix, SparkTaskContextSupplier sparkTaskContextSupplier) {
+
+    return new HoodieCreateHandle(hoodieConfig, commitTime, hoodieTable, partitionPath,
+        getNextFileId(fileIdPrefix), sparkTaskContextSupplier);
+  }
+}
\ No newline at end of file
diff --git a/hudi-client/src/main/java/org/apache/hudi/io/WriteHandleFactory.java b/hudi-client/src/main/java/org/apache/hudi/io/WriteHandleFactory.java
new file mode 100644
index 0000000..7039b71
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/io/WriteHandleFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.client.SparkTaskContextSupplier;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+public abstract class WriteHandleFactory<T extends HoodieRecordPayload> {
+  private int numFilesWritten = 0;
+
+  public abstract HoodieWriteHandle<T> create(HoodieWriteConfig config, String commitTime, HoodieTable<T> hoodieTable,
+      String partitionPath, String fileIdPrefix, SparkTaskContextSupplier sparkTaskContextSupplier);
+
+  protected String getNextFileId(String idPfx) {
+    return String.format("%s-%d", idPfx, numFilesWritten++);
+  }
+}
\ No newline at end of file
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/CommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/CommitActionExecutor.java
index b958837..5208c12 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/CommitActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/CommitActionExecutor.java
@@ -29,7 +29,7 @@ import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieUpsertException;
-import org.apache.hudi.execution.CopyOnWriteLazyInsertIterable;
+import org.apache.hudi.execution.LazyInsertIterable;
 import org.apache.hudi.execution.SparkBoundedInMemoryExecutor;
 import org.apache.hudi.io.HoodieMergeHandle;
 import org.apache.hudi.table.HoodieTable;
@@ -132,7 +132,7 @@ public abstract class CommitActionExecutor<T extends HoodieRecordPayload<T>>
       LOG.info("Empty partition");
       return Collections.singletonList((List<WriteStatus>) Collections.EMPTY_LIST).iterator();
     }
-    return new CopyOnWriteLazyInsertIterable<>(recordItr, config, instantTime, (HoodieTable<T>)table, idPfx,
+    return new LazyInsertIterable<>(recordItr, config, instantTime, (HoodieTable<T>)table, idPfx,
         sparkTaskContextSupplier);
   }
 
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/DeltaCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/DeltaCommitActionExecutor.java
index 775580e..be3806e 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/DeltaCommitActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/DeltaCommitActionExecutor.java
@@ -24,8 +24,9 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieUpsertException;
-import org.apache.hudi.execution.MergeOnReadLazyInsertIterable;
+import org.apache.hudi.execution.LazyInsertIterable;
 import org.apache.hudi.io.HoodieAppendHandle;
+import org.apache.hudi.io.AppendHandleFactory;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.WorkloadProfile;
 
@@ -84,8 +85,8 @@ public abstract class DeltaCommitActionExecutor<T extends HoodieRecordPayload<T>
       throws Exception {
     // If canIndexLogFiles, write inserts to log files else write inserts to parquet files
     if (table.getIndex().canIndexLogFiles()) {
-      return new MergeOnReadLazyInsertIterable<>(recordItr, config, instantTime, (HoodieTable<T>)table, idPfx,
-          sparkTaskContextSupplier);
+      return new LazyInsertIterable<>(recordItr, config, instantTime, (HoodieTable<T>)table, idPfx,
+          sparkTaskContextSupplier, new AppendHandleFactory<>());
     } else {
       return super.handleInsert(idPfx, recordItr);
     }
diff --git a/hudi-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutor.java b/hudi-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutor.java
index 58f898c..1db248f 100644
--- a/hudi-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutor.java
+++ b/hudi-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutor.java
@@ -25,7 +25,7 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.execution.CopyOnWriteLazyInsertIterable.HoodieInsertValueGenResult;
+import org.apache.hudi.execution.LazyInsertIterable.HoodieInsertValueGenResult;
 
 import org.apache.avro.generic.IndexedRecord;
 import org.junit.After;
@@ -37,7 +37,7 @@ import java.util.List;
 
 import scala.Tuple2;
 
-import static org.apache.hudi.execution.CopyOnWriteLazyInsertIterable.getTransformFunction;
+import static org.apache.hudi.execution.LazyInsertIterable.getTransformFunction;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
diff --git a/hudi-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java b/hudi-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java
index 72b3eff..859cf4e 100644
--- a/hudi-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java
+++ b/hudi-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java
@@ -31,7 +31,7 @@ import org.apache.hudi.common.util.queue.BoundedInMemoryQueueProducer;
 import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer;
 import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer;
 import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.execution.CopyOnWriteLazyInsertIterable.HoodieInsertValueGenResult;
+import org.apache.hudi.execution.LazyInsertIterable.HoodieInsertValueGenResult;
 
 import org.apache.avro.generic.IndexedRecord;
 import org.junit.After;
@@ -53,7 +53,7 @@ import java.util.stream.IntStream;
 
 import scala.Tuple2;
 
-import static org.apache.hudi.execution.CopyOnWriteLazyInsertIterable.getTransformFunction;
+import static org.apache.hudi.execution.LazyInsertIterable.getTransformFunction;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;