You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by am...@apache.org on 2020/03/02 16:12:56 UTC
[atlas] 02/02: ATLAS-3643: PC Fx: StatusReporter: Introduce Status
Reporting to Be used With PC Framework
This is an automated email from the ASF dual-hosted git repository.
amestry pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
commit ed2755dcce37318ffce21e5ecd7692416aa49a1d
Author: Ashutosh Mestry <am...@cloudera.com>
AuthorDate: Mon Mar 2 08:05:55 2020 -0800
ATLAS-3643: PC Fx: StatusReporter: Introduce Status Reporting to Be used With PC Framework
---
.../java/org/apache/atlas/pc/StatusReporter.java | 75 +++++++++++++++++
.../org/apache/atlas/pc/StatusReporterTest.java | 94 ++++++++++++++++++++++
2 files changed, 169 insertions(+)
diff --git a/intg/src/main/java/org/apache/atlas/pc/StatusReporter.java b/intg/src/main/java/org/apache/atlas/pc/StatusReporter.java
new file mode 100644
index 0000000..f84e8d0
--- /dev/null
+++ b/intg/src/main/java/org/apache/atlas/pc/StatusReporter.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.pc;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class StatusReporter<T, U> {
+ private Map<T,U> producedItems = new LinkedHashMap<>();
+ private Set<T> processedSet = new HashSet<>();
+
+ public void produced(T item, U index) {
+ this.producedItems.put(item, index);
+ }
+
+ public void processed(T item) {
+ this.processedSet.add(item);
+ }
+
+ public void processed(T[] index) {
+ this.processedSet.addAll(Arrays.asList(index));
+ }
+
+ public U ack() {
+ U ack = null;
+ U ret;
+ do {
+ ret = completionIndex(getFirstElement(this.producedItems));
+ if (ret != null) {
+ ack = ret;
+ }
+ } while(ret != null);
+
+ return ack;
+ }
+
+ private Map.Entry<T, U> getFirstElement(Map<T, U> map) {
+ if (map.isEmpty()) {
+ return null;
+ }
+
+ return map.entrySet().iterator().next();
+ }
+
+ private U completionIndex(Map.Entry<T, U> lookFor) {
+ U ack = null;
+ if (lookFor == null || !processedSet.contains(lookFor.getKey())) {
+ return ack;
+ }
+
+ ack = lookFor.getValue();
+ producedItems.remove(lookFor.getKey());
+ processedSet.remove(lookFor);
+ return ack;
+ }
+}
diff --git a/intg/src/test/java/org/apache/atlas/pc/StatusReporterTest.java b/intg/src/test/java/org/apache/atlas/pc/StatusReporterTest.java
new file mode 100644
index 0000000..3e50562
--- /dev/null
+++ b/intg/src/test/java/org/apache/atlas/pc/StatusReporterTest.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.pc;
+
+import org.apache.commons.lang3.RandomUtils;
+import org.testng.annotations.Test;
+
+import java.util.concurrent.BlockingQueue;
+
+import static org.testng.Assert.assertEquals;
+
+public class StatusReporterTest {
+ private static class IntegerConsumer extends WorkItemConsumer<Integer> {
+ private static ThreadLocal<Integer> payload = new ThreadLocal<Integer>();
+ private Integer current;
+
+ public IntegerConsumer(BlockingQueue<Integer> queue) {
+ super(queue);
+ }
+
+ @Override
+ protected void doCommit() {
+ addResult(current);
+ }
+
+ @Override
+ protected void processItem(Integer item) {
+ try {
+ this.current = item;
+ Thread.sleep(20 + RandomUtils.nextInt(5, 7));
+ super.commit();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ private class IntegerConsumerBuilder implements WorkItemBuilder<IntegerConsumer, Integer> {
+ @Override
+ public IntegerConsumer build(BlockingQueue<Integer> queue) {
+ return new IntegerConsumer(queue);
+ }
+ }
+
+ private WorkItemManager<Integer, WorkItemConsumer> getWorkItemManger(IntegerConsumerBuilder cb, int numWorkers) {
+ return new WorkItemManager<>(cb, "IntegerConsumer", 5, numWorkers, true);
+ }
+
+ @Test
+ public void statusReporting() throws InterruptedException {
+ final int maxItems = 50;
+
+ IntegerConsumerBuilder cb = new IntegerConsumerBuilder();
+ WorkItemManager<Integer, WorkItemConsumer> wi = getWorkItemManger(cb, 5);
+ StatusReporter<Integer, Integer> statusReporter = new StatusReporter<>();
+
+ for (int i = 0; i < maxItems; i++) {
+ wi.produce(i);
+ statusReporter.produced(i, i);
+
+ extractResults(wi, statusReporter);
+ }
+
+ wi.drain();
+ extractResults(wi, statusReporter);
+ assertEquals(statusReporter.ack().intValue(), (maxItems - 1));
+ wi.shutdown();
+ }
+
+ private void extractResults(WorkItemManager<Integer, WorkItemConsumer> wi, StatusReporter<Integer, Integer> statusReporter) {
+ Object result = null;
+ while((result = wi.getResults().poll()) != null) {
+ if (result == null || !(result instanceof Integer)) {
+ continue;
+ }
+ statusReporter.processed((Integer) result);
+ }
+ }
+}