You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2016/04/04 19:32:31 UTC

[1/2] nifi git commit: NIFI-1442: This closes #306. Use CircularFifoQueue instead of Set to store nodes' bulletins Joint effort by Toivo Adams from PR306 and and Mark Payne

Repository: nifi
Updated Branches:
  refs/heads/support/nifi-0.6.x 15bab7dda -> 980317ba0


NIFI-1442: This closes #306. Use CircularFifoQueue instead of Set to store nodes' bulletins
Joint effort by Toivo Adams from PR306 and and Mark Payne

Signed-off-by: joewitt <jo...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/139199da
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/139199da
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/139199da

Branch: refs/heads/support/nifi-0.6.x
Commit: 139199da9dc35e59e40226a3b926aba6fca0bb46
Parents: 15bab7d
Author: Mark Payne <ma...@hotmail.com>
Authored: Mon Mar 28 10:35:44 2016 -0400
Committer: joewitt <jo...@apache.org>
Committed: Mon Apr 4 13:24:31 2016 -0400

----------------------------------------------------------------------
 .../events/NodeBulletinProcessingStrategy.java  | 40 ++++------------
 .../TestNodeBulletinProcessingStrategy.java     | 49 ++++++++++++++++++++
 2 files changed, 57 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/139199da/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/NodeBulletinProcessingStrategy.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/NodeBulletinProcessingStrategy.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/NodeBulletinProcessingStrategy.java
index d3cfd9e..8c00d64 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/NodeBulletinProcessingStrategy.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/NodeBulletinProcessingStrategy.java
@@ -17,50 +17,26 @@
 package org.apache.nifi.events;
 
 import java.util.HashSet;
-import java.util.LinkedHashSet;
 import java.util.Set;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
 
+import org.apache.commons.collections4.queue.CircularFifoQueue;
 import org.apache.nifi.reporting.Bulletin;
 
 /**
  *
  */
 public class NodeBulletinProcessingStrategy implements BulletinProcessingStrategy {
-
-    private final Lock lock;
-    private final Set<Bulletin> bulletins;
-
-    public NodeBulletinProcessingStrategy() {
-        lock = new ReentrantLock();
-        bulletins = new LinkedHashSet<>();
-    }
+    static final int MAX_ENTRIES = 5;
+    private final CircularFifoQueue<Bulletin> ringBuffer = new CircularFifoQueue<>(MAX_ENTRIES);
 
     @Override
-    public void update(final Bulletin bulletin) {
-        lock.lock();
-        try {
-            bulletins.add(bulletin);
-        } finally {
-            lock.unlock();
-        }
+    public synchronized void update(final Bulletin bulletin) {
+        ringBuffer.add(bulletin);
     }
 
-    public Set<Bulletin> getBulletins() {
-        final Set<Bulletin> response = new HashSet<>();
-
-        lock.lock();
-        try {
-            // get all the bulletins currently stored
-            response.addAll(bulletins);
-
-            // remove the bulletins
-            bulletins.clear();
-        } finally {
-            lock.unlock();
-        }
-
+    public synchronized Set<Bulletin> getBulletins() {
+        final Set<Bulletin> response = new HashSet<>(ringBuffer);
+        ringBuffer.clear();
         return response;
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/139199da/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/events/TestNodeBulletinProcessingStrategy.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/events/TestNodeBulletinProcessingStrategy.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/events/TestNodeBulletinProcessingStrategy.java
new file mode 100644
index 0000000..394c940
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/events/TestNodeBulletinProcessingStrategy.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.events;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+
+public class TestNodeBulletinProcessingStrategy {
+
+    @Test
+    public void testUpdate() {
+
+        NodeBulletinProcessingStrategy nBulletinProcessingStrategy = new NodeBulletinProcessingStrategy();
+
+        nBulletinProcessingStrategy.update(new ComponentBulletin(1));
+        nBulletinProcessingStrategy.update(new ComponentBulletin(2));
+        nBulletinProcessingStrategy.update(new ComponentBulletin(3));
+        nBulletinProcessingStrategy.update(new ComponentBulletin(4));
+        nBulletinProcessingStrategy.update(new ComponentBulletin(5));
+        assertEquals(5, nBulletinProcessingStrategy.getBulletins().size());
+
+        nBulletinProcessingStrategy.update(new ComponentBulletin(1));
+        nBulletinProcessingStrategy.update(new ComponentBulletin(2));
+        nBulletinProcessingStrategy.update(new ComponentBulletin(3));
+        nBulletinProcessingStrategy.update(new ComponentBulletin(4));
+        nBulletinProcessingStrategy.update(new ComponentBulletin(5));
+        nBulletinProcessingStrategy.update(new ComponentBulletin(6));
+        nBulletinProcessingStrategy.update(new ComponentBulletin(7));
+        assertEquals(NodeBulletinProcessingStrategy.MAX_ENTRIES, nBulletinProcessingStrategy.getBulletins().size());
+
+    }
+
+}
\ No newline at end of file


[2/2] nifi git commit: NIFI-1691: Add Fetch Size property to QueryDatabaseTable

Posted by jo...@apache.org.
NIFI-1691: Add Fetch Size property to QueryDatabaseTable

This closes #307

Signed-off-by: Bryan Bende <bb...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/980317ba
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/980317ba
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/980317ba

Branch: refs/heads/support/nifi-0.6.x
Commit: 980317ba04c55d0b145747e4db9310d9911956cb
Parents: 139199d
Author: Matt Burgess <ma...@apache.org>
Authored: Mon Mar 28 12:16:24 2016 -0400
Committer: joewitt <jo...@apache.org>
Committed: Mon Apr 4 13:27:47 2016 -0400

----------------------------------------------------------------------
 .../processors/standard/QueryDatabaseTable.java | 21 ++++++++++++++++++++
 .../standard/QueryDatabaseTableTest.java        |  1 +
 2 files changed, 22 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/980317ba/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
index 08f6b41..9403eb8 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
@@ -175,6 +175,15 @@ public class QueryDatabaseTable extends AbstractSessionFactoryProcessor {
             .defaultValue("None")
             .build();
 
+    public static final PropertyDescriptor FETCH_SIZE = new PropertyDescriptor.Builder()
+            .name("Fetch Size")
+            .description("The number of result rows to be fetched from the result set at a time. This is a hint to the driver and may not be "
+                    + "honored and/or exact. If the value specified is zero, then the hint is ignored.")
+            .defaultValue("0")
+            .required(true)
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            .build();
+
 
     private final List<PropertyDescriptor> propDescriptors;
 
@@ -192,6 +201,7 @@ public class QueryDatabaseTable extends AbstractSessionFactoryProcessor {
         pds.add(MAX_VALUE_COLUMN_NAMES);
         pds.add(QUERY_TIMEOUT);
         pds.add(SQL_PREPROCESS_STRATEGY);
+        pds.add(FETCH_SIZE);
         propDescriptors = Collections.unmodifiableList(pds);
     }
 
@@ -251,6 +261,8 @@ public class QueryDatabaseTable extends AbstractSessionFactoryProcessor {
         final String columnNames = context.getProperty(COLUMN_NAMES).getValue();
         final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).getValue();
         final String preProcessStrategy = context.getProperty(SQL_PREPROCESS_STRATEGY).getValue();
+        final Integer fetchSize = context.getProperty(FETCH_SIZE).asInteger();
+
         final StateManager stateManager = context.getStateManager();
         final StateMap stateMap;
 
@@ -272,6 +284,15 @@ public class QueryDatabaseTable extends AbstractSessionFactoryProcessor {
         try (final Connection con = dbcpService.getConnection();
              final Statement st = con.createStatement()) {
 
+            if (fetchSize != null && fetchSize > 0) {
+                try {
+                    st.setFetchSize(fetchSize);
+                } catch (SQLException se) {
+                    // Not all drivers support this, just log the error (at debug level) and move on
+                    logger.debug("Cannot set fetch size to {} due to {}", new Object[]{fetchSize, se.getLocalizedMessage()}, se);
+                }
+            }
+
             final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
             st.setQueryTimeout(queryTimeout); // timeout in seconds
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/980317ba/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
index d16b9c6..f932e4d 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
@@ -154,6 +154,7 @@ public class QueryDatabaseTableTest {
         runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
 
         InputStream in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
+        runner.setProperty(QueryDatabaseTable.FETCH_SIZE, "2");
         assertEquals(3, getNumberOfRecordsFromStream(in));
         runner.clearTransferState();