You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2022/06/03 13:19:41 UTC

[tika] branch main updated (6d5fec611 -> 184cf7602)

This is an automated email from the ASF dual-hosted git repository.

tallison pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git


    from 6d5fec611 Merge remote-tracking branch 'origin/main' into main
     new 382c1b399 TIKA-3782 -- improve logging an idcolumn handling in jdbc pipes iterator
     new 184cf7602 TIKA-3785 -- align pipes-iterator-csv with pipes-jdbc behavior

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../pipes/pipesiterator/csv/CSVPipesIterator.java  | 81 +++++++++++-----------
 .../pipesiterator/jdbc/JDBCPipesIterator.java      | 66 ++++++++++++++----
 .../src/test/resources/log4j.properties            | 22 ------
 .../src/test/resources/log4j2.xml                  |  0
 4 files changed, 92 insertions(+), 77 deletions(-)
 delete mode 100644 tika-pipes/tika-pipes-iterators/tika-pipes-iterator-jdbc/src/test/resources/log4j.properties
 copy {tika-core => tika-pipes/tika-pipes-iterators/tika-pipes-iterator-jdbc}/src/test/resources/log4j2.xml (100%)


[tika] 01/02: TIKA-3782 -- improve logging an idcolumn handling in jdbc pipes iterator

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git

commit 382c1b3996c6100b1b822e6f19061b7dae3c3131
Author: tallison <ta...@apache.org>
AuthorDate: Fri Jun 3 09:04:04 2022 -0400

    TIKA-3782 -- improve logging an idcolumn handling in jdbc pipes iterator
---
 .../pipesiterator/jdbc/JDBCPipesIterator.java      | 66 +++++++++++++++++-----
 .../src/test/resources/log4j.properties            | 22 --------
 .../src/test/resources/log4j2.xml                  | 32 +++++++++++
 3 files changed, 83 insertions(+), 37 deletions(-)

diff --git a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-jdbc/src/main/java/org/apache/tika/pipes/pipesiterator/jdbc/JDBCPipesIterator.java b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-jdbc/src/main/java/org/apache/tika/pipes/pipesiterator/jdbc/JDBCPipesIterator.java
index a03f24653..5dafee713 100644
--- a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-jdbc/src/main/java/org/apache/tika/pipes/pipesiterator/jdbc/JDBCPipesIterator.java
+++ b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-jdbc/src/main/java/org/apache/tika/pipes/pipesiterator/jdbc/JDBCPipesIterator.java
@@ -77,6 +77,10 @@ public class JDBCPipesIterator extends PipesIterator implements Initializable {
     private String connection;
     private String select;
 
+    private int fetchSize = -1;
+
+    private int queryTimeoutSeconds = -1;
+
     private Connection db;
 
     @Field
@@ -118,6 +122,20 @@ public class JDBCPipesIterator extends PipesIterator implements Initializable {
         this.select = select;
     }
 
+    @Field
+    public void setFetchSize(int fetchSize) throws TikaConfigException {
+        if (fetchSize == 0) {
+            throw new TikaConfigException("Can't set fetch size == 0");
+        }
+        if (fetchSize < 0) {
+            LOGGER.info("fetch size < 0; no fetch size will be set");
+        }
+        this.fetchSize = fetchSize;
+    }
+
+    public void setQueryTimeoutSeconds(int seconds) {
+        this.queryTimeoutSeconds = seconds;
+    }
 
     @Override
     protected void enqueue() throws InterruptedException, IOException, TimeoutException {
@@ -129,6 +147,12 @@ public class JDBCPipesIterator extends PipesIterator implements Initializable {
         HandlerConfig handlerConfig = getHandlerConfig();
         LOGGER.debug("select: {}", select);
         try (Statement st = db.createStatement()) {
+            if (fetchSize > 0) {
+                st.setFetchSize(fetchSize);
+            }
+            if (queryTimeoutSeconds > 0) {
+                st.setQueryTimeout(queryTimeoutSeconds);
+            }
             try (ResultSet rs = st.executeQuery(select)) {
                 while (rs.next()) {
                     if (headers.size() == 0) {
@@ -166,11 +190,19 @@ public class JDBCPipesIterator extends PipesIterator implements Initializable {
 
         if (!StringUtils.isBlank(fetchKeyColumn) && fetchEmitKeyIndices.fetchKeyIndex < 0) {
             throw new IOException(
-                    new TikaConfigException("Couldn't find column: " + fetchKeyColumn));
+                    new TikaConfigException("Couldn't find fetchkey column: " + fetchKeyColumn));
         }
         if (!StringUtils.isBlank(emitKeyColumn) && fetchEmitKeyIndices.emitKeyIndex < 0) {
             throw new IOException(
-                    new TikaConfigException("Couldn't find column: " + emitKeyColumn));
+                    new TikaConfigException("Couldn't find emitKey column: " + emitKeyColumn));
+        }
+        if (!StringUtils.isBlank(idColumn) && fetchEmitKeyIndices.idIndex < 0) {
+            throw new IOException(
+                    new TikaConfigException("Couldn't find id column: " + idColumn));
+        }
+        if (StringUtils.isBlank(idColumn)) {
+            LOGGER.warn("id column is blank, using fetchkey column as the id column");
+            fetchEmitKeyIndices.idIndex = fetchEmitKeyIndices.fetchKeyIndex;
         }
     }
 
@@ -185,42 +217,46 @@ public class JDBCPipesIterator extends PipesIterator implements Initializable {
         String emitKey = "";
         String id = "";
         for (int i = 1; i <= rs.getMetaData().getColumnCount(); i++) {
-
+            //a single column can be the fetch key and the emit key, etc.
+            boolean isUsed = false;
             if (i == fetchEmitKeyIndices.fetchKeyIndex) {
                 fetchKey = getString(i, rs);
-                if (fetchKey == null) {
+                if (StringUtils.isBlank(fetchKey)) {
                     LOGGER.debug("fetchKey is empty for record " + toString(rs));
                 }
                 fetchKey = (fetchKey == null) ? "" : fetchKey;
-                continue;
+                isUsed = true;
             }
             if (i == fetchEmitKeyIndices.emitKeyIndex) {
                 emitKey = getString(i, rs);
-                if (emitKey == null) {
+                if (StringUtils.isBlank(emitKey)) {
                     LOGGER.debug("emitKey is empty for record " + toString(rs));
                 }
                 emitKey = (emitKey == null) ? "" : emitKey;
-                continue;
+                isUsed = true;
             }
             if (i == fetchEmitKeyIndices.idIndex) {
                 id = getString(i, rs);
-                if (id == null) {
+                if (StringUtils.isBlank(id)) {
                     LOGGER.warn("id is empty for record " + toString(rs));
                 }
                 id = (id == null) ? "" : id;
-                continue;
+                isUsed = true;
             }
             if (i == fetchEmitKeyIndices.fetchStartRangeIndex) {
                 fetchStartRange = getLong(i, rs);
-                continue;
+                isUsed = true;
             }
             if (i == fetchEmitKeyIndices.fetchEndRangeIndex) {
                 fetchEndRange = getLong(i, rs);
-                continue;
+                isUsed = true;
+
             }
-            String val = getString(i, rs);
-            if (val != null) {
-                metadata.set(headers.get(i - 1), val);
+            if (! isUsed) {
+                String val = getString(i, rs);
+                if (! StringUtils.isBlank(val)) {
+                    metadata.set(headers.get(i - 1), val);
+                }
             }
         }
 
@@ -324,7 +360,7 @@ public class JDBCPipesIterator extends PipesIterator implements Initializable {
     }
 
     private static class FetchEmitKeyIndices {
-        private final int idIndex;
+        private int idIndex;
         private final int fetchKeyIndex;
         private final int fetchStartRangeIndex;
         private final int fetchEndRangeIndex;
diff --git a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-jdbc/src/test/resources/log4j.properties b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-jdbc/src/test/resources/log4j.properties
deleted file mode 100644
index 2b2da1abc..000000000
--- a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-jdbc/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,22 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#info,debug, error,fatal ...
-log4j.rootLogger=info,stderr
-#console
-log4j.appender.stderr=org.apache.log4j.ConsoleAppender
-log4j.appender.stderr.layout=org.apache.log4j.PatternLayout
-log4j.appender.stderr.Target=System.err
-log4j.appender.stderr.layout.ConversionPattern=%-5p [%t]: %m%n
diff --git a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-jdbc/src/test/resources/log4j2.xml b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-jdbc/src/test/resources/log4j2.xml
new file mode 100644
index 000000000..5f946e6e5
--- /dev/null
+++ b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-jdbc/src/test/resources/log4j2.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<Configuration status="INFO">
+  <Appenders>
+    <Console name="console" target="SYSTEM_ERR">
+      <PatternLayout
+          pattern="%-5p [%t] %d{HH:mm:ss,SSS} %c %m%n" />
+    </Console>
+  </Appenders>
+  <Loggers>
+    <Root level="info" additivity="false">
+      <AppenderRef ref="console" />
+    </Root>
+  </Loggers>
+</Configuration>


[tika] 02/02: TIKA-3785 -- align pipes-iterator-csv with pipes-jdbc behavior

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git

commit 184cf76023fd6a5236cb6b51ad7b1c077b8fd593
Author: tallison <ta...@apache.org>
AuthorDate: Fri Jun 3 09:19:29 2022 -0400

    TIKA-3785 -- align pipes-iterator-csv with pipes-jdbc behavior
---
 .../pipes/pipesiterator/csv/CSVPipesIterator.java  | 81 +++++++++++-----------
 1 file changed, 41 insertions(+), 40 deletions(-)

diff --git a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-csv/src/main/java/org/apache/tika/pipes/pipesiterator/csv/CSVPipesIterator.java b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-csv/src/main/java/org/apache/tika/pipes/pipesiterator/csv/CSVPipesIterator.java
index 17ea95420..8fb441d8b 100644
--- a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-csv/src/main/java/org/apache/tika/pipes/pipesiterator/csv/CSVPipesIterator.java
+++ b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-csv/src/main/java/org/apache/tika/pipes/pipesiterator/csv/CSVPipesIterator.java
@@ -123,12 +123,16 @@ public class CSVPipesIterator extends PipesIterator implements Initializable {
                 break;
             }
 
-            checkFetchEmitValidity(fetcherName, emitterName, fetchEmitKeyIndices, headers);
+            try {
+                checkFetchEmitValidity(fetcherName, emitterName, fetchEmitKeyIndices, headers);
+            } catch (TikaConfigException e) {
+                throw new IOException(e);
+            }
             HandlerConfig handlerConfig = getHandlerConfig();
             for (CSVRecord record : records) {
-                String id = getId(fetchEmitKeyIndices, record);
-                String fetchKey = getFetchKey(fetchEmitKeyIndices, record);
-                String emitKey = getEmitKey(fetchEmitKeyIndices, record);
+                String id = record.get(fetchEmitKeyIndices.idIndex);
+                String fetchKey = record.get(fetchEmitKeyIndices.fetchKeyIndex);
+                String emitKey = record.get(fetchEmitKeyIndices.emitKeyIndex);
                 if (StringUtils.isBlank(fetchKey) && !StringUtils.isBlank(fetcherName)) {
                     LOGGER.debug("Fetcher specified ({}), but no fetchkey was found in ({})",
                             fetcherName, record);
@@ -136,9 +140,7 @@ public class CSVPipesIterator extends PipesIterator implements Initializable {
                 if (StringUtils.isBlank(emitKey)) {
                     throw new IOException("emitKey must not be blank in :" + record);
                 }
-                if (StringUtils.isBlank(id) && ! StringUtils.isBlank(fetchKey)) {
-                    id = fetchKey;
-                }
+
                 Metadata metadata = loadMetadata(fetchEmitKeyIndices, headers, record);
                 tryToAdd(new FetchEmitTuple(id, new FetchKey(fetcherName, fetchKey),
                         new EmitKey(emitterName, emitKey), metadata, handlerConfig,
@@ -149,62 +151,51 @@ public class CSVPipesIterator extends PipesIterator implements Initializable {
 
     private void checkFetchEmitValidity(String fetcherName, String emitterName,
                                         FetchEmitKeyIndices fetchEmitKeyIndices,
-                                        List<String> headers) throws IOException {
+                                        List<String> headers) throws TikaConfigException {
 
         if (StringUtils.isBlank(emitterName)) {
-            throw new IOException(new TikaConfigException("must specify at least an emitterName"));
+            throw new TikaConfigException("must specify at least an emitterName");
         }
 
         if (StringUtils.isBlank(fetcherName) && !StringUtils.isBlank(fetchKeyColumn)) {
-            throw new IOException(new TikaConfigException("If specifying a 'fetchKeyColumn', " +
-                    "you must also specify a 'fetcherName'"));
+            new TikaConfigException("If specifying a 'fetchKeyColumn', " +
+                    "you must also specify a 'fetcherName'");
         }
 
         if (StringUtils.isBlank(fetcherName)) {
-            LOGGER.debug("No fetcher specified. This will be metadata only");
+            LOGGER.info("No fetcher specified. This will be metadata only");
         }
 
+        if (StringUtils.isBlank(fetchKeyColumn)) {
+            throw new TikaConfigException("must specify fetchKeyColumn");
+        }
         //if a fetchkeycolumn is specified, make sure that it was found
         if (!StringUtils.isBlank(fetchKeyColumn) && fetchEmitKeyIndices.fetchKeyIndex < 0) {
-            throw new IOException(new TikaConfigException(
+            throw new TikaConfigException(
                     "Couldn't find fetchKeyColumn (" + fetchKeyColumn + " in header.\n" +
-                            "These are the headers I see: " + headers));
+                            "These are the headers I see: " + headers);
         }
 
         //if an emitkeycolumn is specified, make sure that it was found
         if (!StringUtils.isBlank(emitKeyColumn) && fetchEmitKeyIndices.emitKeyIndex < 0) {
-            throw new IOException(new TikaConfigException(
+            throw new TikaConfigException(
                     "Couldn't find emitKeyColumn (" + emitKeyColumn + " in header.\n" +
-                            "These are the headers I see: " + headers));
+                            "These are the headers I see: " + headers);
+        }
+
+        //if an idcolumn is specified, make sure that it was found
+        if (!StringUtils.isBlank(idColumn) && fetchEmitKeyIndices.idIndex < 0) {
+            throw new TikaConfigException(
+                    "Couldn't find idColumn (" + idColumn + " in header.\n" +
+                            "These are the headers I see: " + headers);
         }
 
         if (StringUtils.isBlank(emitKeyColumn)) {
-            LOGGER.debug("No emitKeyColumn specified. " +
+            LOGGER.warn("No emitKeyColumn specified. " +
                             "Will use fetchKeyColumn ({}) for both the fetch key and emit key",
                     fetchKeyColumn);
         }
-    }
-
-    private String getId(FetchEmitKeyIndices fetchEmitKeyIndices, CSVRecord record) {
-        if (fetchEmitKeyIndices.idIndex > -1) {
-            return record.get(fetchEmitKeyIndices.idIndex);
-        }
-        return StringUtils.EMPTY;
-    }
-
-
-    private String getFetchKey(FetchEmitKeyIndices fetchEmitKeyIndices, CSVRecord record) {
-        if (fetchEmitKeyIndices.fetchKeyIndex > -1) {
-            return record.get(fetchEmitKeyIndices.fetchKeyIndex);
-        }
-        return StringUtils.EMPTY;
-    }
 
-    private String getEmitKey(FetchEmitKeyIndices fetchEmitKeyIndices, CSVRecord record) {
-        if (fetchEmitKeyIndices.emitKeyIndex > -1) {
-            return record.get(fetchEmitKeyIndices.emitKeyIndex);
-        }
-        return getFetchKey(fetchEmitKeyIndices, record);
     }
 
     private Metadata loadMetadata(FetchEmitKeyIndices fetchEmitKeyIndices, List<String> headers,
@@ -240,6 +231,16 @@ public class CSVPipesIterator extends PipesIterator implements Initializable {
                 idIndex = col;
             }
         }
+
+        if (StringUtils.isBlank(idColumn)) {
+            LOGGER.info("no idColumn specified, will use fetchKeyColumn");
+            idIndex = fetchKeyColumnIndex;
+        }
+
+        if (StringUtils.isBlank(emitKeyColumn)) {
+            LOGGER.info("no emitKeyColumn specified, will use fetchKeyColumn");
+            emitKeyColumnIndex = fetchKeyColumnIndex;
+        }
         return new FetchEmitKeyIndices(idIndex, fetchKeyColumnIndex, emitKeyColumnIndex);
     }
 
@@ -251,9 +252,9 @@ public class CSVPipesIterator extends PipesIterator implements Initializable {
     }
 
     private static class FetchEmitKeyIndices {
-        private final int idIndex;
+        private int idIndex;
         private final int fetchKeyIndex;
-        private final int emitKeyIndex;
+        private int emitKeyIndex;
 
         public FetchEmitKeyIndices(int idIndex, int fetchKeyIndex, int emitKeyIndex) {
             this.idIndex = idIndex;