You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by am...@apache.org on 2020/03/02 16:12:56 UTC

[atlas] 02/02: ATLAS-3643: PC Fx: StatusReporter: Introduce Status Reporting to Be used With PC Framework

This is an automated email from the ASF dual-hosted git repository.

amestry pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git

commit ed2755dcce37318ffce21e5ecd7692416aa49a1d
Author: Ashutosh Mestry <am...@cloudera.com>
AuthorDate: Mon Mar 2 08:05:55 2020 -0800

    ATLAS-3643: PC Fx: StatusReporter: Introduce Status Reporting to Be used With PC Framework
---
 .../java/org/apache/atlas/pc/StatusReporter.java   | 75 +++++++++++++++++
 .../org/apache/atlas/pc/StatusReporterTest.java    | 94 ++++++++++++++++++++++
 2 files changed, 169 insertions(+)

diff --git a/intg/src/main/java/org/apache/atlas/pc/StatusReporter.java b/intg/src/main/java/org/apache/atlas/pc/StatusReporter.java
new file mode 100644
index 0000000..f84e8d0
--- /dev/null
+++ b/intg/src/main/java/org/apache/atlas/pc/StatusReporter.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.pc;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class StatusReporter<T, U> {
+    private Map<T,U> producedItems = new LinkedHashMap<>();
+    private Set<T> processedSet = new HashSet<>();
+
+    public void produced(T item, U index) {
+        this.producedItems.put(item, index);
+    }
+
+    public void processed(T item) {
+        this.processedSet.add(item);
+    }
+
+    public void processed(T[] index) {
+        this.processedSet.addAll(Arrays.asList(index));
+    }
+
+    public U ack() {
+        U ack = null;
+        U ret;
+        do {
+            ret = completionIndex(getFirstElement(this.producedItems));
+            if (ret != null) {
+                ack = ret;
+            }
+        } while(ret != null);
+
+        return ack;
+    }
+
+    private Map.Entry<T, U> getFirstElement(Map<T, U> map) {
+        if (map.isEmpty()) {
+            return null;
+        }
+
+        return map.entrySet().iterator().next();
+    }
+
+    private U completionIndex(Map.Entry<T, U> lookFor) {
+        U ack = null;
+        if (lookFor == null || !processedSet.contains(lookFor.getKey())) {
+            return ack;
+        }
+
+        ack = lookFor.getValue();
+        producedItems.remove(lookFor.getKey());
+        processedSet.remove(lookFor);
+        return ack;
+    }
+}
diff --git a/intg/src/test/java/org/apache/atlas/pc/StatusReporterTest.java b/intg/src/test/java/org/apache/atlas/pc/StatusReporterTest.java
new file mode 100644
index 0000000..3e50562
--- /dev/null
+++ b/intg/src/test/java/org/apache/atlas/pc/StatusReporterTest.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.pc;
+
+import org.apache.commons.lang3.RandomUtils;
+import org.testng.annotations.Test;
+
+import java.util.concurrent.BlockingQueue;
+
+import static org.testng.Assert.assertEquals;
+
+public class StatusReporterTest {
+    private static class IntegerConsumer extends WorkItemConsumer<Integer> {
+        private static ThreadLocal<Integer> payload = new ThreadLocal<Integer>();
+        private Integer current;
+
+        public IntegerConsumer(BlockingQueue<Integer> queue) {
+            super(queue);
+        }
+
+        @Override
+        protected void doCommit() {
+            addResult(current);
+        }
+
+        @Override
+        protected void processItem(Integer item) {
+            try {
+                this.current = item;
+                Thread.sleep(20 + RandomUtils.nextInt(5, 7));
+                super.commit();
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    private class IntegerConsumerBuilder implements WorkItemBuilder<IntegerConsumer, Integer> {
+        @Override
+        public IntegerConsumer build(BlockingQueue<Integer> queue) {
+            return new IntegerConsumer(queue);
+        }
+    }
+
+    private WorkItemManager<Integer, WorkItemConsumer> getWorkItemManger(IntegerConsumerBuilder cb, int numWorkers) {
+        return new WorkItemManager<>(cb, "IntegerConsumer", 5, numWorkers, true);
+    }
+
+    @Test
+    public void statusReporting() throws InterruptedException {
+        final int maxItems = 50;
+
+        IntegerConsumerBuilder cb = new IntegerConsumerBuilder();
+        WorkItemManager<Integer, WorkItemConsumer> wi = getWorkItemManger(cb, 5);
+        StatusReporter<Integer, Integer> statusReporter = new StatusReporter<>();
+
+        for (int i = 0; i < maxItems; i++) {
+            wi.produce(i);
+            statusReporter.produced(i, i);
+
+            extractResults(wi, statusReporter);
+        }
+
+        wi.drain();
+        extractResults(wi, statusReporter);
+        assertEquals(statusReporter.ack().intValue(), (maxItems - 1));
+        wi.shutdown();
+    }
+
+    private void extractResults(WorkItemManager<Integer, WorkItemConsumer> wi, StatusReporter<Integer, Integer> statusReporter) {
+        Object result = null;
+        while((result = wi.getResults().poll()) != null) {
+            if (result == null || !(result instanceof Integer)) {
+                continue;
+            }
+            statusReporter.processed((Integer) result);
+        }
+    }
+}