You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2016/04/04 19:32:31 UTC
[1/2] nifi git commit: NIFI-1442: This closes #306. Use
CircularFifoQueue instead of Set to store nodes' bulletins Joint effort by
Toivo Adams from PR306 and and Mark Payne
Repository: nifi
Updated Branches:
refs/heads/support/nifi-0.6.x 15bab7dda -> 980317ba0
NIFI-1442: This closes #306. Use CircularFifoQueue instead of Set to store nodes' bulletins
Joint effort by Toivo Adams from PR306 and and Mark Payne
Signed-off-by: joewitt <jo...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/139199da
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/139199da
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/139199da
Branch: refs/heads/support/nifi-0.6.x
Commit: 139199da9dc35e59e40226a3b926aba6fca0bb46
Parents: 15bab7d
Author: Mark Payne <ma...@hotmail.com>
Authored: Mon Mar 28 10:35:44 2016 -0400
Committer: joewitt <jo...@apache.org>
Committed: Mon Apr 4 13:24:31 2016 -0400
----------------------------------------------------------------------
.../events/NodeBulletinProcessingStrategy.java | 40 ++++------------
.../TestNodeBulletinProcessingStrategy.java | 49 ++++++++++++++++++++
2 files changed, 57 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/139199da/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/NodeBulletinProcessingStrategy.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/NodeBulletinProcessingStrategy.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/NodeBulletinProcessingStrategy.java
index d3cfd9e..8c00d64 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/NodeBulletinProcessingStrategy.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/NodeBulletinProcessingStrategy.java
@@ -17,50 +17,26 @@
package org.apache.nifi.events;
import java.util.HashSet;
-import java.util.LinkedHashSet;
import java.util.Set;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
+import org.apache.commons.collections4.queue.CircularFifoQueue;
import org.apache.nifi.reporting.Bulletin;
/**
*
*/
public class NodeBulletinProcessingStrategy implements BulletinProcessingStrategy {
-
- private final Lock lock;
- private final Set<Bulletin> bulletins;
-
- public NodeBulletinProcessingStrategy() {
- lock = new ReentrantLock();
- bulletins = new LinkedHashSet<>();
- }
+ static final int MAX_ENTRIES = 5;
+ private final CircularFifoQueue<Bulletin> ringBuffer = new CircularFifoQueue<>(MAX_ENTRIES);
@Override
- public void update(final Bulletin bulletin) {
- lock.lock();
- try {
- bulletins.add(bulletin);
- } finally {
- lock.unlock();
- }
+ public synchronized void update(final Bulletin bulletin) {
+ ringBuffer.add(bulletin);
}
- public Set<Bulletin> getBulletins() {
- final Set<Bulletin> response = new HashSet<>();
-
- lock.lock();
- try {
- // get all the bulletins currently stored
- response.addAll(bulletins);
-
- // remove the bulletins
- bulletins.clear();
- } finally {
- lock.unlock();
- }
-
+ public synchronized Set<Bulletin> getBulletins() {
+ final Set<Bulletin> response = new HashSet<>(ringBuffer);
+ ringBuffer.clear();
return response;
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/139199da/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/events/TestNodeBulletinProcessingStrategy.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/events/TestNodeBulletinProcessingStrategy.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/events/TestNodeBulletinProcessingStrategy.java
new file mode 100644
index 0000000..394c940
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/events/TestNodeBulletinProcessingStrategy.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.events;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+
+public class TestNodeBulletinProcessingStrategy {
+
+ @Test
+ public void testUpdate() {
+
+ NodeBulletinProcessingStrategy nBulletinProcessingStrategy = new NodeBulletinProcessingStrategy();
+
+ nBulletinProcessingStrategy.update(new ComponentBulletin(1));
+ nBulletinProcessingStrategy.update(new ComponentBulletin(2));
+ nBulletinProcessingStrategy.update(new ComponentBulletin(3));
+ nBulletinProcessingStrategy.update(new ComponentBulletin(4));
+ nBulletinProcessingStrategy.update(new ComponentBulletin(5));
+ assertEquals(5, nBulletinProcessingStrategy.getBulletins().size());
+
+ nBulletinProcessingStrategy.update(new ComponentBulletin(1));
+ nBulletinProcessingStrategy.update(new ComponentBulletin(2));
+ nBulletinProcessingStrategy.update(new ComponentBulletin(3));
+ nBulletinProcessingStrategy.update(new ComponentBulletin(4));
+ nBulletinProcessingStrategy.update(new ComponentBulletin(5));
+ nBulletinProcessingStrategy.update(new ComponentBulletin(6));
+ nBulletinProcessingStrategy.update(new ComponentBulletin(7));
+ assertEquals(NodeBulletinProcessingStrategy.MAX_ENTRIES, nBulletinProcessingStrategy.getBulletins().size());
+
+ }
+
+}
\ No newline at end of file
[2/2] nifi git commit: NIFI-1691: Add Fetch Size property to
QueryDatabaseTable
Posted by jo...@apache.org.
NIFI-1691: Add Fetch Size property to QueryDatabaseTable
This closes #307
Signed-off-by: Bryan Bende <bb...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/980317ba
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/980317ba
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/980317ba
Branch: refs/heads/support/nifi-0.6.x
Commit: 980317ba04c55d0b145747e4db9310d9911956cb
Parents: 139199d
Author: Matt Burgess <ma...@apache.org>
Authored: Mon Mar 28 12:16:24 2016 -0400
Committer: joewitt <jo...@apache.org>
Committed: Mon Apr 4 13:27:47 2016 -0400
----------------------------------------------------------------------
.../processors/standard/QueryDatabaseTable.java | 21 ++++++++++++++++++++
.../standard/QueryDatabaseTableTest.java | 1 +
2 files changed, 22 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/980317ba/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
index 08f6b41..9403eb8 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
@@ -175,6 +175,15 @@ public class QueryDatabaseTable extends AbstractSessionFactoryProcessor {
.defaultValue("None")
.build();
+ public static final PropertyDescriptor FETCH_SIZE = new PropertyDescriptor.Builder()
+ .name("Fetch Size")
+ .description("The number of result rows to be fetched from the result set at a time. This is a hint to the driver and may not be "
+ + "honored and/or exact. If the value specified is zero, then the hint is ignored.")
+ .defaultValue("0")
+ .required(true)
+ .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+ .build();
+
private final List<PropertyDescriptor> propDescriptors;
@@ -192,6 +201,7 @@ public class QueryDatabaseTable extends AbstractSessionFactoryProcessor {
pds.add(MAX_VALUE_COLUMN_NAMES);
pds.add(QUERY_TIMEOUT);
pds.add(SQL_PREPROCESS_STRATEGY);
+ pds.add(FETCH_SIZE);
propDescriptors = Collections.unmodifiableList(pds);
}
@@ -251,6 +261,8 @@ public class QueryDatabaseTable extends AbstractSessionFactoryProcessor {
final String columnNames = context.getProperty(COLUMN_NAMES).getValue();
final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).getValue();
final String preProcessStrategy = context.getProperty(SQL_PREPROCESS_STRATEGY).getValue();
+ final Integer fetchSize = context.getProperty(FETCH_SIZE).asInteger();
+
final StateManager stateManager = context.getStateManager();
final StateMap stateMap;
@@ -272,6 +284,15 @@ public class QueryDatabaseTable extends AbstractSessionFactoryProcessor {
try (final Connection con = dbcpService.getConnection();
final Statement st = con.createStatement()) {
+ if (fetchSize != null && fetchSize > 0) {
+ try {
+ st.setFetchSize(fetchSize);
+ } catch (SQLException se) {
+ // Not all drivers support this, just log the error (at debug level) and move on
+ logger.debug("Cannot set fetch size to {} due to {}", new Object[]{fetchSize, se.getLocalizedMessage()}, se);
+ }
+ }
+
final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
st.setQueryTimeout(queryTimeout); // timeout in seconds
http://git-wip-us.apache.org/repos/asf/nifi/blob/980317ba/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
index d16b9c6..f932e4d 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
@@ -154,6 +154,7 @@ public class QueryDatabaseTableTest {
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
InputStream in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
+ runner.setProperty(QueryDatabaseTable.FETCH_SIZE, "2");
assertEquals(3, getNumberOfRecordsFromStream(in));
runner.clearTransferState();