You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2015/10/14 09:18:38 UTC
svn commit: r1708553 [1/2] - in /manifoldcf/branches/CONNECTORS-1162: ./
connectors/ connectors/kafka/ connectors/kafka/connector/
connectors/kafka/connector/src/ connectors/kafka/connector/src/main/
connectors/kafka/connector/src/main/java/ connectors...
Author: kwright
Date: Wed Oct 14 07:18:37 2015
New Revision: 1708553
URL: http://svn.apache.org/viewvc?rev=1708553&view=rev
Log:
Import kafka connector (except for a few wrong changes) from github
Added:
manifoldcf/branches/CONNECTORS-1162/connectors/kafka/
manifoldcf/branches/CONNECTORS-1162/connectors/kafka/build.xml
manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/
manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/
manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/
manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/java/
manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/java/org/
manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/java/org/apache/
manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/java/org/apache/manifoldcf/
manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/java/org/apache/manifoldcf/agents/
manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/java/org/apache/manifoldcf/agents/output/
manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/java/org/apache/manifoldcf/agents/output/kafka/
manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/java/org/apache/manifoldcf/agents/output/kafka/KafkaConfig.java
manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/java/org/apache/manifoldcf/agents/output/kafka/KafkaMessage.java
manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/java/org/apache/manifoldcf/agents/output/kafka/KafkaOutputConnector.java
manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/java/org/apache/manifoldcf/agents/output/kafka/Messages.java
manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/native2ascii/
manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/native2ascii/org/
manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/native2ascii/org/apache/
manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/native2ascii/org/apache/manifoldcf/
manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/native2ascii/org/apache/manifoldcf/agents/
manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/native2ascii/org/apache/manifoldcf/agents/output/
manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/native2ascii/org/apache/manifoldcf/agents/output/kafka/
manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/native2ascii/org/apache/manifoldcf/agents/output/kafka/common_en_US.properties
manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/native2ascii/org/apache/manifoldcf/agents/output/kafka/common_ja_JP.properties
manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/native2ascii/org/apache/manifoldcf/agents/output/kafka/common_zh_CN.properties
manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/resources/
manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/resources/org/
manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/resources/org/apache/
manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/resources/org/apache/manifoldcf/
manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/resources/org/apache/manifoldcf/agents/
manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/resources/org/apache/manifoldcf/agents/output/
manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/resources/org/apache/manifoldcf/agents/output/kafka/
manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/resources/org/apache/manifoldcf/agents/output/kafka/editConfiguration.js
manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/resources/org/apache/manifoldcf/agents/output/kafka/editConfiguration_Parameters.html
manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/resources/org/apache/manifoldcf/agents/output/kafka/viewConfiguration.html
manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/
manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/
manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/
manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/
manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/
manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/
manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/
manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/
manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/APISanityHSQLDBIT.java
manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/BaseHSQLDB.java
manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/BaseITHSQLDB.java
manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/BasePostgresql.java
manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/BaseUIHSQLDB.java
manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/KafkaConnectorTest.java
manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/KafkaLocal.java
manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/NavigationHSQLDBUI.java
manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/ZooKeeperLocal.java
manifoldcf/branches/CONNECTORS-1162/connectors/kafka/pom.xml
Modified:
manifoldcf/branches/CONNECTORS-1162/build.xml
manifoldcf/branches/CONNECTORS-1162/connectors/pom.xml
manifoldcf/branches/CONNECTORS-1162/framework/build.xml
Modified: manifoldcf/branches/CONNECTORS-1162/build.xml
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1162/build.xml?rev=1708553&r1=1708552&r2=1708553&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-1162/build.xml (original)
+++ manifoldcf/branches/CONNECTORS-1162/build.xml Wed Oct 14 07:18:37 2015
@@ -1692,6 +1692,66 @@ Use Apache Forrest version forrest-0.9-d
<fileset dir="build/download/apache-manifoldcf-elasticsearch-1.5-plugin-bin/elasticsearch-1.5-plugin-mcf-2.0"/>
</copy>
</target>
+
+ <target name="download-kafka-client">
+ <mkdir dir="lib"/>
+ <antcall target="download-via-maven">
+ <param name="target" value="lib"/>
+ <param name="project-path" value="org/apache/kafka"/>
+ <param name="artifact-version" value="0.8.2.1"/>
+ <param name="artifact-name" value="kafka-clients"/>
+ <param name="artifact-type" value="jar"/>
+ </antcall>
+ <antcall target="download-via-maven">
+ <param name="target" value="lib"/>
+ <param name="project-path" value="org/apache/kafka"/>
+ <param name="artifact-version" value="0.8.2.1"/>
+ <param name="artifact-name" value="kafka_2.11"/>
+ <param name="artifact-type" value="jar"/>
+ </antcall>
+ <antcall target="download-via-maven">
+ <param name="target" value="lib"/>
+ <param name="project-path" value="org/apache/commons"/>
+ <param name="artifact-version" value="3.4"/>
+ <param name="artifact-name" value="commons-lang3"/>
+ <param name="artifact-type" value="jar"/>
+ </antcall>
+ <antcall target="download-via-maven">
+ <param name="target" value="lib"/>
+ <param name="project-path" value="com/yammer/metrics"/>
+ <param name="artifact-version" value="2.2.0"/>
+ <param name="artifact-name" value="metrics-core"/>
+ <param name="artifact-type" value="jar"/>
+ </antcall>
+ <antcall target="download-via-maven">
+ <param name="target" value="lib"/>
+ <param name="project-path" value="org/scala-lang"/>
+ <param name="artifact-version" value="2.11.0"/>
+ <param name="artifact-name" value="scala-library"/>
+ <param name="artifact-type" value="jar"/>
+ </antcall>
+ <antcall target="download-via-maven">
+ <param name="target" value="lib"/>
+ <param name="project-path" value="net/sf/jopt-simple"/>
+ <param name="artifact-version" value="3.2"/>
+ <param name="artifact-name" value="jopt-simple"/>
+ <param name="artifact-type" value="jar"/>
+ </antcall>
+ <antcall target="download-via-maven">
+ <param name="target" value="lib"/>
+ <param name="project-path" value="org/scala-lang/modules"/>
+ <param name="artifact-version" value="1.0.4"/>
+ <param name="artifact-name" value="scala-xml_2.11"/>
+ <param name="artifact-type" value="jar"/>
+ </antcall>
+ <antcall target="download-via-maven">
+ <param name="target" value="lib"/>
+ <param name="project-path" value="org/scala-lang/modules"/>
+ <param name="artifact-version" value="1.0.4"/>
+ <param name="artifact-name" value="scala-parser-combinators_2.11"/>
+ <param name="artifact-type" value="jar"/>
+ </antcall>
+ </target>
<target name="download-dropbox-client">
<mkdir dir="lib"/>
@@ -2298,7 +2358,7 @@ Use Apache Forrest version forrest-0.9-d
</antcall>
</target>
- <target name="make-core-deps" depends="download-joda-time,download-aws-sdk,download-resteasy,download-jsoup,download-mockito,download-alfresco-webscript-plugin,download-alfresco-indexer-client,download-mongo-java-driver,download-jira-client,download-google-api-client,download-dropbox-client,download-solrj,download-zookeeper,download-httpcomponents,download-json,download-hsqldb,download-xerces,download-commons,download-elasticsearch-plugin,download-solr-plugins,download-sharepoint-plugins,download-jstl,download-xmlgraphics-commons,download-woodstox,download-xmlsec,download-xml-apis,download-wss4j,download-velocity,download-streambuffer,download-stax,download-servlet-api,download-xml-resolver,download-osgi,download-opensaml,download-mimepull,download-mail,download-log4j,download-junit,download-jaxws,download-glassfish,download-jaxb,download-tomcat,download-h2,download-h2-support,download-geronimo-specs,download-fop,download-postgresql,download-axis,download-saaj,download-wsdl4j,do
wnload-castor,download-jetty,download-slf4j,download-xalan,download-activation,download-avalon-framework,download-poi,download-chemistry,download-ecj,download-hadoop,download-htrace,download-protobuf,download-tika,download-jackson">
+ <target name="make-core-deps" depends="download-kafka-client,download-joda-time,download-aws-sdk,download-resteasy,download-jsoup,download-mockito,download-alfresco-webscript-plugin,download-alfresco-indexer-client,download-mongo-java-driver,download-jira-client,download-google-api-client,download-dropbox-client,download-solrj,download-zookeeper,download-httpcomponents,download-json,download-hsqldb,download-xerces,download-commons,download-elasticsearch-plugin,download-solr-plugins,download-sharepoint-plugins,download-jstl,download-xmlgraphics-commons,download-woodstox,download-xmlsec,download-xml-apis,download-wss4j,download-velocity,download-streambuffer,download-stax,download-servlet-api,download-xml-resolver,download-osgi,download-opensaml,download-mimepull,download-mail,download-log4j,download-junit,download-jaxws,download-glassfish,download-jaxb,download-tomcat,download-h2,download-h2-support,download-geronimo-specs,download-fop,download-postgresql,download-axis,download-s
aaj,download-wsdl4j,download-castor,download-jetty,download-slf4j,download-xalan,download-activation,download-avalon-framework,download-poi,download-chemistry,download-ecj,download-hadoop,download-htrace,download-protobuf,download-tika,download-jackson">
<copy todir="lib">
<fileset dir="lib-license" includes="*.txt"/>
</copy>
@@ -2354,4 +2414,4 @@ Use Apache Forrest version forrest-0.9-d
<ant dir="site" target="download-cleanup"/>
</target>
-</project>
+</project>
\ No newline at end of file
Added: manifoldcf/branches/CONNECTORS-1162/connectors/kafka/build.xml
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1162/connectors/kafka/build.xml?rev=1708553&view=auto
==============================================================================
--- manifoldcf/branches/CONNECTORS-1162/connectors/kafka/build.xml (added)
+++ manifoldcf/branches/CONNECTORS-1162/connectors/kafka/build.xml Wed Oct 14 07:18:37 2015
@@ -0,0 +1,48 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<project name="kafka" default="all">
+ <presetdef name="javac">
+ <javac includeantruntime="false" />
+ </presetdef>
+
+ <property environment="env"/>
+ <condition property="mcf-dist" value="${env.MCFDISTPATH}">
+ <isset property="env.MCFDISTPATH"/>
+ </condition>
+ <property name="abs-dist" location="../../dist"/>
+ <condition property="mcf-dist" value="${abs-dist}">
+ <not>
+ <isset property="env.MCFDISTPATH"/>
+ </not>
+ </condition>
+
+ <import file="${mcf-dist}/connector-build.xml"/>
+
+ <target name="deliver-connector" depends="mcf-connector-build.deliver-connector">
+ <antcall target="general-add-output-connector">
+ <param name="connector-label" value="Kafka"/>
+ <param name="connector-class" value="org.apache.manifoldcf.agents.output.kafka.KafkaOutputConnector"/>
+ </antcall>
+ </target>
+
+ <path id="connector-test-classpath">
+ <path refid="mcf-connector-build.connector-test-classpath"/>
+ <fileset file="../../lib/*.jar"/>
+ </path>
+
+</project>
Added: manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/java/org/apache/manifoldcf/agents/output/kafka/KafkaConfig.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/java/org/apache/manifoldcf/agents/output/kafka/KafkaConfig.java?rev=1708553&view=auto
==============================================================================
--- manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/java/org/apache/manifoldcf/agents/output/kafka/KafkaConfig.java (added)
+++ manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/java/org/apache/manifoldcf/agents/output/kafka/KafkaConfig.java Wed Oct 14 07:18:37 2015
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.manifoldcf.agents.output.kafka;
+
+public class KafkaConfig {
+
+ // Configuration parameters
+ public static final String IP = "ip";
+ public static final String PORT = "port";
+ public static final String TOPIC = "topic";
+
+ public static final String IP_DEFAULT = "localhost";
+ public static final String PORT_DEFAULT = "9092";
+ public static final String TOPIC_DEFAULT = "topic";
+}
Added: manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/java/org/apache/manifoldcf/agents/output/kafka/KafkaMessage.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/java/org/apache/manifoldcf/agents/output/kafka/KafkaMessage.java?rev=1708553&view=auto
==============================================================================
--- manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/java/org/apache/manifoldcf/agents/output/kafka/KafkaMessage.java (added)
+++ manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/java/org/apache/manifoldcf/agents/output/kafka/KafkaMessage.java Wed Oct 14 07:18:37 2015
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.manifoldcf.agents.output.kafka;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.Iterator;
+import org.apache.commons.io.IOUtils;
+import org.apache.manifoldcf.agents.interfaces.RepositoryDocument;
+import static org.apache.manifoldcf.agents.output.kafka.KafkaOutputConnector.allowAttributeName;
+import static org.apache.manifoldcf.agents.output.kafka.KafkaOutputConnector.denyAttributeName;
+import static org.apache.manifoldcf.agents.output.kafka.KafkaOutputConnector.noSecurityToken;
+import static org.apache.manifoldcf.agents.output.kafka.KafkaOutputConnector.useNullValue;
+import org.apache.manifoldcf.core.common.Base64;
+
+/**
+ *
+ * @author tugba
+ */
+public class KafkaMessage {
+
+ private final String[] acls = null;
+ private final String[] denyAcls = null;
+ private final String[] shareAcls = null;
+ private final String[] shareDenyAcls = null;
+ private final String[] parentAcls = null;
+ private final String[] parentDenyAcls = null;
+ private InputStream inputStream = null;
+
+ public byte[] createJSON(RepositoryDocument document) {
+ String finalString = null;
+ // create temporaray byte array output stream
+ OutputStream out = new ByteArrayOutputStream();
+ try {
+ inputStream = document.getBinaryStream();
+
+ // print to our byte array output stream
+ PrintWriter pw = new PrintWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8));
+
+ pw.print("{");
+ Iterator<String> i = document.getFields();
+ boolean needComma = false;
+ while (i.hasNext()) {
+ String fieldName = i.next();
+ String[] fieldValues = document.getFieldAsStrings(fieldName);
+ needComma = writeField(pw, needComma, fieldName, fieldValues);
+ }
+
+ needComma = writeACLs(pw, needComma, "document", acls, denyAcls);
+ needComma = writeACLs(pw, needComma, "share", shareAcls, shareDenyAcls);
+ needComma = writeACLs(pw, needComma, "parent", parentAcls, parentDenyAcls);
+
+ if (inputStream != null) {
+ if (needComma) {
+ pw.print(",");
+ }
+ // I'm told this is not necessary: see CONNECTORS-690
+ //pw.print("\"type\" : \"attachment\",");
+ pw.print("\"file\" : {");
+ String contentType = document.getMimeType();
+ if (contentType != null) {
+ pw.print("\"_content_type\" : " + jsonStringEscape(contentType) + ",");
+ }
+ String fileName = document.getFileName();
+ if (fileName != null) {
+ pw.print("\"_name\" : " + jsonStringEscape(fileName) + ",");
+ }
+ // Since ES 1.0
+ pw.print(" \"_content\" : \"");
+ Base64 base64 = new Base64();
+ base64.encodeStream(inputStream, pw);
+ pw.print("\"}");
+ }
+ pw.print("}");
+ pw.flush();
+ IOUtils.closeQuietly(pw);
+ finalString = new String(((ByteArrayOutputStream) out).toByteArray());
+ //System.out.println("FINAL: " + finalString);
+ } catch (Exception e) {
+ e.printStackTrace();
+ // throw new IOException(e.getMessage());
+ }
+ return ((ByteArrayOutputStream) out).toByteArray();
+ }
+
+ protected static boolean writeField(PrintWriter pw, boolean needComma,
+ String fieldName, String[] fieldValues)
+ throws IOException {
+ if (fieldValues == null) {
+ return needComma;
+ }
+
+ if (fieldValues.length == 1) {
+ if (needComma) {
+ pw.print(",");
+ }
+ pw.print(jsonStringEscape(fieldName) + " : " + jsonStringEscape(fieldValues[0]));
+ needComma = true;
+ return needComma;
+ }
+
+ if (fieldValues.length > 1) {
+ if (needComma) {
+ pw.print(",");
+ }
+ StringBuilder sb = new StringBuilder();
+ sb.append("[");
+ for (int j = 0; j < fieldValues.length; j++) {
+ sb.append(jsonStringEscape(fieldValues[j])).append(",");
+ }
+ sb.setLength(sb.length() - 1); // discard last ","
+ sb.append("]");
+ pw.print(jsonStringEscape(fieldName) + " : " + sb.toString());
+ needComma = true;
+ }
+ return needComma;
+ }
+
+ /**
+ * Output an acl level
+ */
+ protected static boolean writeACLs(PrintWriter pw, boolean needComma,
+ String aclType, String[] acl, String[] denyAcl)
+ throws IOException {
+ String metadataACLName = allowAttributeName + aclType;
+ if (acl != null && acl.length > 0) {
+ needComma = writeField(pw, needComma, metadataACLName, acl);
+ } else if (!useNullValue) {
+ needComma = writeField(pw, needComma, metadataACLName, new String[]{noSecurityToken});
+ }
+ String metadataDenyACLName = denyAttributeName + aclType;
+ if (denyAcl != null && denyAcl.length > 0) {
+ needComma = writeField(pw, needComma, metadataDenyACLName, denyAcl);
+ } else if (!useNullValue) {
+ needComma = writeField(pw, needComma, metadataDenyACLName, new String[]{noSecurityToken});
+ }
+ return needComma;
+ }
+
+ protected static String jsonStringEscape(String value) {
+ StringBuilder sb = new StringBuilder("\"");
+ for (int i = 0; i < value.length(); i++) {
+ char x = value.charAt(i);
+ if (x == '\n') {
+ sb.append('\\').append('n');
+ } else if (x == '\r') {
+ sb.append('\\').append('r');
+ } else if (x == '\t') {
+ sb.append('\\').append('t');
+ } else if (x == '\b') {
+ sb.append('\\').append('b');
+ } else if (x == '\f') {
+ sb.append('\\').append('f');
+ } else {
+ if (x == '\"' || x == '\\' || x == '/') {
+ sb.append('\\');
+ }
+ sb.append(x);
+ }
+ }
+ sb.append("\"");
+ return sb.toString();
+ }
+}
Added: manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/java/org/apache/manifoldcf/agents/output/kafka/KafkaOutputConnector.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/java/org/apache/manifoldcf/agents/output/kafka/KafkaOutputConnector.java?rev=1708553&view=auto
==============================================================================
--- manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/java/org/apache/manifoldcf/agents/output/kafka/KafkaOutputConnector.java (added)
+++ manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/java/org/apache/manifoldcf/agents/output/kafka/KafkaOutputConnector.java Wed Oct 14 07:18:37 2015
@@ -0,0 +1,364 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.manifoldcf.agents.output.kafka;
+
+import org.apache.manifoldcf.core.interfaces.*;
+import org.apache.manifoldcf.agents.interfaces.*;
+
+import java.util.*;
+import java.io.*;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.PartitionInfo;
+
+/**
+ * This is a kafka output connector.
+ */
+public class KafkaOutputConnector extends org.apache.manifoldcf.agents.output.BaseOutputConnector {
+
+ public static final String _rcsid = "@(#)$Id: KafkaOutputConnector.java 988245 2010-08-23 18:39:35Z kwright $";
+
+ // Activities we log
+ /**
+ * Ingestion activity
+ */
+ public final static String INGEST_ACTIVITY = "document ingest";
+
+ /**
+ * Job notify activity
+ */
+ public final static String JOB_COMPLETE_ACTIVITY = "output notification";
+
+ private final static String KAFKA_TAB_PARAMETERS = "KafkaConnector.Parameters";
+
+ /**
+ * Forward to the javascript to check the configuration parameters
+ */
+ private static final String EDIT_CONFIG_HEADER_FORWARD = "editConfiguration.js";
+
+ /**
+ * Forward to the HTML template to edit the configuration parameters
+ */
+ private static final String EDIT_CONFIG_FORWARD_PARAMETERS = "editConfiguration_Parameters.html";
+
+ /**
+ * Forward to the HTML template to view the configuration parameters
+ */
+ private static final String VIEW_CONFIG_FORWARD = "viewConfiguration.html";
+
+ /**
+ * cloudsearch field name for file body text.
+ */
+ private static final String FILE_BODY_TEXT_FIELDNAME = "f_bodytext";
+
+ /**
+ * Field name we use for document's URI.
+ */
+ private static final String DOCUMENT_URI_FIELDNAME = "document_URI";
+
+ /**
+ * The allow attribute name
+ */
+ protected final static String allowAttributeName = "allow_token_";
+ /**
+ * The deny attribute name
+ */
+ protected final static String denyAttributeName = "deny_token_";
+ /**
+ * The no-security token
+ */
+ protected final static String noSecurityToken = "__nosecurity__";
+
+ protected final static boolean useNullValue = false;
+
+ KafkaProducer producer = null;
+
+ /**
+ * Constructor.
+ */
+ public KafkaOutputConnector() {
+ }
+
+ public void setProducer(KafkaProducer producer) {
+ this.producer = producer;
+ }
+
+ /**
+ * Return the list of activities that this connector supports (i.e. writes
+ * into the log).
+ *
+ * @return the list.
+ */
+ @Override
+ public String[] getActivitiesList() {
+ return new String[]{INGEST_ACTIVITY, JOB_COMPLETE_ACTIVITY};
+ }
+
+ /**
+ * Connect.
+ *
+ * @param configParameters is the set of configuration parameters, which in
+ * this case describe the target appliance, basic auth configuration, etc.
+ * (This formerly came out of the ini file.)
+ */
+ @Override
+ public void connect(ConfigParams configParameters) {
+ super.connect(configParameters);
+
+ Properties props = new Properties();
+ String IP = params.getParameter(KafkaConfig.IP);
+ String PORT = params.getParameter(KafkaConfig.PORT);
+ //System.out.println("Kafka IP: " + IP);
+ //System.out.println("Kafka Port: " + PORT);
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, IP + ":" + PORT);
+ props.put(ProducerConfig.RETRIES_CONFIG, "3");
+ props.put(ProducerConfig.ACKS_CONFIG, "all");
+ props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none");
+ props.put(ProducerConfig.BATCH_SIZE_CONFIG, 200);
+ props.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, true);
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+ producer = new KafkaProducer(props);
+ }
+
+ /**
+ * Close the connection. Call this before discarding the connection.
+ */
+ @Override
+ public void disconnect()
+ throws ManifoldCFException {
+ super.disconnect();
+ }
+
+ /**
+ * Fill in a Server tab configuration parameter map for calling a Velocity
+ * template.
+ *
+ * @param newMap is the map to fill in
+ * @param parameters is the current set of configuration parameters
+ */
+ private static void fillInServerConfigurationMap(Map<String, Object> newMap, IPasswordMapperActivity mapper, ConfigParams parameters) {
+ String IP = parameters.getParameter(KafkaConfig.IP);
+ String port = parameters.getParameter(KafkaConfig.PORT);
+ String topic = parameters.getParameter(KafkaConfig.TOPIC);
+
+ if (IP == null) {
+ IP = "localhost";
+ }
+ if (port == null) {
+ port = "9092";
+ }
+ if (topic == null) {
+ topic = "topic";
+ }
+
+ newMap.put("IP", IP);
+ newMap.put("PORT", port);
+ newMap.put("TOPIC", topic);
+ }
+
+ @Override
+ public void outputConfigurationHeader(IThreadContext threadContext,
+ IHTTPOutput out, Locale locale, ConfigParams parameters,
+ List<String> tabsArray) throws ManifoldCFException, IOException {
+ // Add the Server tab
+ tabsArray.add(Messages.getString(locale, KAFKA_TAB_PARAMETERS));
+ // Map the parameters
+ Map<String, Object> paramMap = new HashMap<String, Object>();
+
+ // Fill in the parameters from each tab
+ fillInServerConfigurationMap(paramMap, out, parameters);
+
+ // Output the Javascript - only one Velocity template for all tabs
+ Messages.outputResourceWithVelocity(out, locale, EDIT_CONFIG_HEADER_FORWARD, paramMap);
+
+ }
+
+ @Override
+ public void outputConfigurationBody(IThreadContext threadContext,
+ IHTTPOutput out, Locale locale, ConfigParams parameters, String tabName)
+ throws ManifoldCFException, IOException {
+
+ // Call the Velocity templates for each tab
+ Map<String, Object> paramMap = new HashMap<String, Object>();
+
+ // Set the tab name
+ paramMap.put("TABNAME", tabName);
+
+ // Fill in the parameters
+ fillInServerConfigurationMap(paramMap, out, parameters);
+
+ // Server tab
+ Messages.outputResourceWithVelocity(out, locale, EDIT_CONFIG_FORWARD_PARAMETERS, paramMap);
+ }
+
+ @Override
+ public void viewConfiguration(IThreadContext threadContext, IHTTPOutput out,
+ Locale locale, ConfigParams parameters) throws ManifoldCFException,
+ IOException {
+
+ Map<String, Object> paramMap = new HashMap<String, Object>();
+
+ // Fill in map from each tab
+ fillInServerConfigurationMap(paramMap, out, parameters);
+
+ Messages.outputResourceWithVelocity(out, locale, VIEW_CONFIG_FORWARD, paramMap);
+ }
+
+ @Override
+ public String processConfigurationPost(IThreadContext threadContext,
+ IPostParameters variableContext, ConfigParams parameters)
+ throws ManifoldCFException {
+ // Server tab parameters
+ String IP = variableContext.getParameter(KafkaConfig.IP);
+ if (IP != null) {
+ parameters.setParameter(KafkaConfig.IP, IP);
+ }
+ String port = variableContext.getParameter(KafkaConfig.PORT);
+ if (port != null) {
+ parameters.setParameter(KafkaConfig.PORT, port);
+ }
+ String topic = variableContext.getParameter(KafkaConfig.TOPIC);
+ if (topic != null) {
+ parameters.setParameter(KafkaConfig.TOPIC, topic);
+ }
+ return null;
+ }
+
+ /**
+ * Test the connection. Returns a string describing the connection integrity.
+ *
+ * @return the connection's status as a displayable string.
+ */
+ @Override
+ public String check()
+ throws ManifoldCFException {
+ try {
+ List<PartitionInfo> partitions = producer.partitionsFor(params.getParameter(KafkaConfig.TOPIC));
+ return super.check();
+ } catch (ManifoldCFException e) {
+ return "Connection failed: " + e.getMessage();
+ }
+ }
+
+ /**
+ * Get an output version string, given an output specification. The output
+ * version string is used to uniquely describe the pertinent details of the
+ * output specification and the configuration, to allow the Connector
+ * Framework to determine whether a document will need to be output again.
+ * Note that the contents of the document cannot be considered by this method,
+ * and that a different version string (defined in IRepositoryConnector) is
+ * used to describe the version of the actual document.
+ *
+ * This method presumes that the connector object has been configured, and it
+ * is thus able to communicate with the output data store should that be
+ * necessary.
+ *
+ * @param spec is the current output specification for the job that is doing
+ * the crawling.
+ * @return a string, of unlimited length, which uniquely describes output
+ * configuration and specification in such a way that if two such strings are
+ * equal, the document will not need to be sent again to the output data
+ * sstore.
+ */
+ @Override
+ public VersionContext getPipelineDescription(Specification spec)
+ throws ManifoldCFException, ServiceInterruption {
+ return new VersionContext("", params, spec);
+ }
+
+ /**
+ * Add (or replace) a document in the output data store using the connector.
+ * This method presumes that the connector object has been configured, and it
+ * is thus able to communicate with the output data store should that be
+ * necessary.
+ *
+ * @param documentURI is the URI of the document. The URI is presumed to be
+ * the unique identifier which the output data store will use to process and
+ * serve the document. This URI is constructed by the repository connector
+ * which fetches the document, and is thus universal across all output
+ * connectors.
+ * @param pipelineDescription includes the description string that was
+ * constructed for this document by the getOutputDescription() method.
+ * @param document is the document data to be processed (handed to the output
+ * data store).
+ * @param authorityNameString is the name of the authority responsible for
+ * authorizing any access tokens passed in with the repository document. May
+ * be null.
+ * @param activities is the handle to an object that the implementer of a
+ * pipeline connector may use to perform operations, such as logging
+ * processing activity, or sending a modified document to the next stage in
+ * the pipeline.
+ * @return the document status (accepted or permanently rejected).
+ * @throws IOException only if there's a stream error reading the document
+ * data.
+ */
+ @Override
+ public int addOrReplaceDocumentWithException(String documentURI, VersionContext outputDescription, RepositoryDocument document, String authorityNameString, IOutputAddActivity activities)
+ throws ManifoldCFException, ServiceInterruption, IOException {
+ //System.out.println("Starting to ingest document....");
+ try {
+ KafkaMessage kafkaMessage = new KafkaMessage();
+ // Get document info in JSON format
+ byte[] finalString = kafkaMessage.createJSON(document);
+ String topic = getConfig(params, KafkaConfig.TOPIC, "topic");
+
+ ProducerRecord record = new ProducerRecord(topic, finalString);
+ producer.send(record).get();
+ } catch (InterruptedException e) {
+ new ManifoldCFException("interrupted", ManifoldCFException.INTERRUPTED);
+ } catch (ExecutionException e) {
+ new ManifoldCFException("interrupted", ManifoldCFException.INTERRUPTED);
+ }
+
+ activities.recordActivity(null, INGEST_ACTIVITY, new Long(document.getBinaryLength()), documentURI, "OK", null);
+ return DOCUMENTSTATUS_ACCEPTED;
+ }
+
+ private static String getConfig(ConfigParams config,
+ String parameter,
+ String defaultValue) {
+ if (config == null) {
+ return defaultValue;
+ }
+ final String protocol = config.getParameter(parameter);
+ if (protocol == null) {
+ return defaultValue;
+ }
+ return protocol;
+ }
+
+ /**
+ * Notify the connector of a completed job. This is meant to allow the
+ * connector to flush any internal data structures it has been keeping around,
+ * or to tell the output repository that this is a good time to synchronize
+ * things. It is called whenever a job is either completed or aborted.
+ *
+ * @param activities is the handle to an object that the implementer of an
+ * output connector may use to perform operations, such as logging processing
+ * activity.
+ */
+ @Override
+ public void noteJobComplete(IOutputNotifyActivity activities)
+ throws ManifoldCFException, ServiceInterruption {
+ activities.recordActivity(null, JOB_COMPLETE_ACTIVITY, null, "", "OK", null);
+ }
+}
Added: manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/java/org/apache/manifoldcf/agents/output/kafka/Messages.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/java/org/apache/manifoldcf/agents/output/kafka/Messages.java?rev=1708553&view=auto
==============================================================================
--- manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/java/org/apache/manifoldcf/agents/output/kafka/Messages.java (added)
+++ manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/java/org/apache/manifoldcf/agents/output/kafka/Messages.java Wed Oct 14 07:18:37 2015
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.manifoldcf.agents.output.kafka;
+
+import java.util.Locale;
+import java.util.Map;
+import org.apache.manifoldcf.core.interfaces.ManifoldCFException;
+import org.apache.manifoldcf.core.interfaces.IHTTPOutput;
+
+public class Messages extends org.apache.manifoldcf.ui.i18n.Messages {
+
+ public static final String DEFAULT_BUNDLE_NAME = "org.apache.manifoldcf.agents.output.kafka.common";
+ public static final String DEFAULT_PATH_NAME = "org.apache.manifoldcf.agents.output.kafka";
+
+ /**
+ * Constructor - do no instantiate
+ */
+ protected Messages() {
+ }
+
+ public static String getString(Locale locale, String messageKey) {
+ return getString(DEFAULT_BUNDLE_NAME, locale, messageKey, null);
+ }
+
+ public static String getAttributeString(Locale locale, String messageKey) {
+ return getAttributeString(DEFAULT_BUNDLE_NAME, locale, messageKey, null);
+ }
+
+ public static String getBodyString(Locale locale, String messageKey) {
+ return getBodyString(DEFAULT_BUNDLE_NAME, locale, messageKey, null);
+ }
+
+ public static String getAttributeJavascriptString(Locale locale, String messageKey) {
+ return getAttributeJavascriptString(DEFAULT_BUNDLE_NAME, locale, messageKey, null);
+ }
+
+ public static String getBodyJavascriptString(Locale locale, String messageKey) {
+ return getBodyJavascriptString(DEFAULT_BUNDLE_NAME, locale, messageKey, null);
+ }
+
+ public static String getString(Locale locale, String messageKey, Object[] args) {
+ return getString(DEFAULT_BUNDLE_NAME, locale, messageKey, args);
+ }
+
+ public static String getAttributeString(Locale locale, String messageKey, Object[] args) {
+ return getAttributeString(DEFAULT_BUNDLE_NAME, locale, messageKey, args);
+ }
+
+ public static String getBodyString(Locale locale, String messageKey, Object[] args) {
+ return getBodyString(DEFAULT_BUNDLE_NAME, locale, messageKey, args);
+ }
+
+ public static String getAttributeJavascriptString(Locale locale, String messageKey, Object[] args) {
+ return getAttributeJavascriptString(DEFAULT_BUNDLE_NAME, locale, messageKey, args);
+ }
+
+ public static String getBodyJavascriptString(Locale locale, String messageKey, Object[] args) {
+ return getBodyJavascriptString(DEFAULT_BUNDLE_NAME, locale, messageKey, args);
+ }
+
+ // More general methods which allow bundlenames and class loaders to be specified.
+ public static String getString(String bundleName, Locale locale, String messageKey, Object[] args) {
+ return getString(Messages.class, bundleName, locale, messageKey, args);
+ }
+
+ public static String getAttributeString(String bundleName, Locale locale, String messageKey, Object[] args) {
+ return getAttributeString(Messages.class, bundleName, locale, messageKey, args);
+ }
+
+ public static String getBodyString(String bundleName, Locale locale, String messageKey, Object[] args) {
+ return getBodyString(Messages.class, bundleName, locale, messageKey, args);
+ }
+
+ public static String getAttributeJavascriptString(String bundleName, Locale locale, String messageKey, Object[] args) {
+ return getAttributeJavascriptString(Messages.class, bundleName, locale, messageKey, args);
+ }
+
+ public static String getBodyJavascriptString(String bundleName, Locale locale, String messageKey, Object[] args) {
+ return getBodyJavascriptString(Messages.class, bundleName, locale, messageKey, args);
+ }
+
+ // Resource output
+ public static void outputResource(IHTTPOutput output, Locale locale, String resourceKey,
+ Map<String, String> substitutionParameters, boolean mapToUpperCase)
+ throws ManifoldCFException {
+ outputResource(output, Messages.class, DEFAULT_PATH_NAME, locale, resourceKey,
+ substitutionParameters, mapToUpperCase);
+ }
+
+ public static void outputResourceWithVelocity(IHTTPOutput output, Locale locale, String resourceKey,
+ Map<String, String> substitutionParameters, boolean mapToUpperCase)
+ throws ManifoldCFException {
+ outputResourceWithVelocity(output, Messages.class, DEFAULT_BUNDLE_NAME, DEFAULT_PATH_NAME, locale, resourceKey,
+ substitutionParameters, mapToUpperCase);
+ }
+
+ public static void outputResourceWithVelocity(IHTTPOutput output, Locale locale, String resourceKey,
+ Map<String, Object> contextObjects)
+ throws ManifoldCFException {
+ outputResourceWithVelocity(output, Messages.class, DEFAULT_BUNDLE_NAME, DEFAULT_PATH_NAME, locale, resourceKey,
+ contextObjects);
+ }
+
+}
Added: manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/native2ascii/org/apache/manifoldcf/agents/output/kafka/common_en_US.properties
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/native2ascii/org/apache/manifoldcf/agents/output/kafka/common_en_US.properties?rev=1708553&view=auto
==============================================================================
--- manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/native2ascii/org/apache/manifoldcf/agents/output/kafka/common_en_US.properties (added)
+++ manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/native2ascii/org/apache/manifoldcf/agents/output/kafka/common_en_US.properties Wed Oct 14 07:18:37 2015
@@ -0,0 +1,27 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Add stuff as needed
+KafkaConnector.Parameters=Parameters
+
+KafkaConnector.IPColon=IP:
+KafkaConnector.PortColon=Port:
+KafkaConnector.TopicColon=Topic:
+
+KafkaConnector.PleaseSupplyValidIP=Please supply a valid Kafka IP
+KafkaConnector.PleaseSupplyValidPort=Please supply a valid port
+KafkaConnector.PleaseSupplyValidTopic=Please supply a valid topic name
+
+KafkaConnector.Kafka=Kafka
Added: manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/native2ascii/org/apache/manifoldcf/agents/output/kafka/common_ja_JP.properties
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/native2ascii/org/apache/manifoldcf/agents/output/kafka/common_ja_JP.properties?rev=1708553&view=auto
==============================================================================
--- manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/native2ascii/org/apache/manifoldcf/agents/output/kafka/common_ja_JP.properties (added)
+++ manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/native2ascii/org/apache/manifoldcf/agents/output/kafka/common_ja_JP.properties Wed Oct 14 07:18:37 2015
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Add stuff as needed
Added: manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/native2ascii/org/apache/manifoldcf/agents/output/kafka/common_zh_CN.properties
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/native2ascii/org/apache/manifoldcf/agents/output/kafka/common_zh_CN.properties?rev=1708553&view=auto
==============================================================================
--- manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/native2ascii/org/apache/manifoldcf/agents/output/kafka/common_zh_CN.properties (added)
+++ manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/native2ascii/org/apache/manifoldcf/agents/output/kafka/common_zh_CN.properties Wed Oct 14 07:18:37 2015
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Add stuff as needed
Added: manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/resources/org/apache/manifoldcf/agents/output/kafka/editConfiguration.js
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/resources/org/apache/manifoldcf/agents/output/kafka/editConfiguration.js?rev=1708553&view=auto
==============================================================================
--- manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/resources/org/apache/manifoldcf/agents/output/kafka/editConfiguration.js (added)
+++ manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/resources/org/apache/manifoldcf/agents/output/kafka/editConfiguration.js Wed Oct 14 07:18:37 2015
@@ -0,0 +1,73 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<script type="text/javascript">
+<!--
+function checkConfig() {
+ if (editconnection.ip) {
+ if (editconnection.ip.value == "") {
+ alert("$Encoder.bodyJavascriptEscape($ResourceBundle.getString('KafkaConnector.PleaseSupplyValidKafkaLocation'))");
+ editconnection.ip.focus();
+ return false;
+ }
+ }
+ if (editconnection.port) {
+ if (editconnection.port.value == "") {
+ alert("$Encoder.bodyJavascriptEscape($ResourceBundle.getString('KafkaConnector.PleaseSupplyValidPort'))");
+ editconnection.port.focus();
+ return false;
+ }
+ }
+ if (editconnection.topic) {
+ if (editconnection.topic.value == "") {
+ alert("$Encoder.bodyJavascriptEscape($ResourceBundle.getString('KafkaConnector.PleaseSupplyValidTopic'))");
+ editconnection.topic.focus();
+ return false;
+ }
+ }
+ return true;
+}
+
+function checkConfigForSave() {
+ if (editconnection.ip) {
+ if (editconnection.ip.value == "") {
+ alert("$Encoder.bodyJavascriptEscape($ResourceBundle.getString('KafkaConnector.PleaseSupplyValidIP'))");
+ SelectTab("$Encoder.javascriptBodyEscape($ResourceBundle.getString('KafkaConnector.Parameters'))");
+ editconnection.ip.focus();
+ return false;
+ }
+ }
+ if (editconnection.port) {
+ if (editconnection.port.value == "") {
+ alert("$Encoder.bodyJavascriptEscape($ResourceBundle.getString('KafkaConnector.PleaseSupplyValidPort'))");
+ SelectTab("$Encoder.javascriptBodyEscape($ResourceBundle.getString('KafkaConnector.Parameters'))");
+ editconnection.port.focus();
+ return false;
+ }
+ }
+ if (editconnection.topic) {
+ if (editconnection.topic.value == "") {
+ alert("$Encoder.bodyJavascriptEscape($ResourceBundle.getString('KafkaConnector.PleaseSupplyValidTopic'))");
+ SelectTab("$Encoder.javascriptBodyEscape($ResourceBundle.getString('KafkaConnector.Parameters'))");
+ editconnection.topic.focus();
+ return false;
+ }
+ }
+ return true;
+}
+//-->
+</script>
Added: manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/resources/org/apache/manifoldcf/agents/output/kafka/editConfiguration_Parameters.html
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/resources/org/apache/manifoldcf/agents/output/kafka/editConfiguration_Parameters.html?rev=1708553&view=auto
==============================================================================
--- manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/resources/org/apache/manifoldcf/agents/output/kafka/editConfiguration_Parameters.html (added)
+++ manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/resources/org/apache/manifoldcf/agents/output/kafka/editConfiguration_Parameters.html Wed Oct 14 07:18:37 2015
@@ -0,0 +1,48 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+#if($TABNAME == $ResourceBundle.getString('KafkaConnector.Parameters'))
+
+<table class="displaytable">
+ <tr>
+ <td class="description">
+ <nobr>$Encoder.bodyEscape($ResourceBundle.getString('KafkaConnector.IPColon'))</nobr>
+
+</td>
+<td class="value"><input name="ip" type="text"
+ value="$Encoder.attributeEscape($IP)" size="48" />
+</td>
+</tr>
+<tr>
+ <td class="description"><nobr>$Encoder.bodyEscape($ResourceBundle.getString('KafkaConnector.PortColon'))</nobr></td>
+<td class="value"><input name="port" type="text" value="$Encoder.attributeEscape($PORT)"
+ size="24" /></td>
+</tr>
+<tr>
+ <td class="description"><nobr>$Encoder.bodyEscape($ResourceBundle.getString('KafkaConnector.TopicColon'))</nobr></td>
+<td class="value"><input name="topic" type="text" value="$Encoder.attributeEscape($TOPIC)"
+ size="24" /></td>
+</tr>
+</table>
+
+#else
+
+<input type="hidden" name="ip" value="$Encoder.attributeEscape($IP)" />
+<input type="hidden" name="port" value="$Encoder.attributeEscape($PORT)" />
+<input type="hidden" name="topic" value="$Encoder.attributeEscape($TOPIC)" />
+
+#end
Added: manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/resources/org/apache/manifoldcf/agents/output/kafka/viewConfiguration.html
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/resources/org/apache/manifoldcf/agents/output/kafka/viewConfiguration.html?rev=1708553&view=auto
==============================================================================
--- manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/resources/org/apache/manifoldcf/agents/output/kafka/viewConfiguration.html (added)
+++ manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/resources/org/apache/manifoldcf/agents/output/kafka/viewConfiguration.html Wed Oct 14 07:18:37 2015
@@ -0,0 +1,32 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<table class="displaytable">
+ <tr>
+ <td class="description"><nobr>$Encoder.bodyEscape($ResourceBundle.getString('KafkaConnector.IPColon'))</nobr>
+
+ <td class="value">$Encoder.bodyEscape($IP)</td>
+</tr>
+<tr>
+ <td class="description"><nobr>$Encoder.bodyEscape($ResourceBundle.getString('KafkaConnector.PortColon'))</nobr></td>
+<td class="value">$Encoder.bodyEscape($PORT)</td>
+</tr>
+<tr>
+ <td class="description"><nobr>$Encoder.bodyEscape($ResourceBundle.getString('KafkaConnector.TopicColon'))</nobr></td>
+<td class="value">$Encoder.bodyEscape($TOPIC)</td>
+</tr>
+</table>
\ No newline at end of file
Added: manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/APISanityHSQLDBIT.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/APISanityHSQLDBIT.java?rev=1708553&view=auto
==============================================================================
--- manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/APISanityHSQLDBIT.java (added)
+++ manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/APISanityHSQLDBIT.java Wed Oct 14 07:18:37 2015
@@ -0,0 +1,318 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.manifoldcf.agents.output.kafka;
+
+import org.apache.manifoldcf.core.interfaces.Configuration;
+import org.apache.manifoldcf.core.interfaces.ConfigurationNode;
+import org.apache.manifoldcf.core.interfaces.ManifoldCFException;
+import org.apache.manifoldcf.crawler.system.ManifoldCF;
+import org.junit.Test;
+
+/**
+ * @author tugba
+ */
+public class APISanityHSQLDBIT extends BaseITHSQLDB {
+
+ @Test
+ public void sanityCheck()
+ throws Exception {
+ try {
+ int i;
+
+ // Create a basic file system connection, and save it.
+ ConfigurationNode connectionObject;
+ ConfigurationNode child;
+ Configuration requestObject;
+ Configuration result;
+ connectionObject = new ConfigurationNode("repositoryconnection");
+
+ child = new ConfigurationNode("name");
+ child.setValue("Test Connection");
+ connectionObject.addChild(connectionObject.getChildCount(), child);
+
+ child = new ConfigurationNode("class_name");
+ child.setValue("org.apache.manifoldcf.crawler.tests.TestingRepositoryConnector");
+ connectionObject.addChild(connectionObject.getChildCount(), child);
+
+ child = new ConfigurationNode("description");
+ child.setValue("Test Connection");
+ connectionObject.addChild(connectionObject.getChildCount(), child);
+
+ child = new ConfigurationNode("max_connections");
+ child.setValue("10");
+ connectionObject.addChild(connectionObject.getChildCount(), child);
+
+ child = new ConfigurationNode("configuration");
+
+ //Testing Repository Connector parameters
+ // MHL
+ /*
+ ConfigurationNode cmisBindingNode = new ConfigurationNode("_PARAMETER_");
+ cmisBindingNode.setAttribute("name", CmisConfig.BINDING_PARAM);
+ cmisBindingNode.setValue(CmisConfig.BINDING_DEFAULT_VALUE);
+ child.addChild(child.getChildCount(), cmisBindingNode);
+ */
+ connectionObject.addChild(connectionObject.getChildCount(), child);
+
+ requestObject = new Configuration();
+ requestObject.addChild(0, connectionObject);
+
+ result = performAPIPutOperationViaNodes("repositoryconnections/Test%20Connection", 201, requestObject);
+
+ i = 0;
+ while (i < result.getChildCount()) {
+ ConfigurationNode resultNode = result.findChild(i++);
+ if (resultNode.getType().equals("error")) {
+ throw new Exception(resultNode.getValue());
+ }
+ }
+
+ // Create a basic null output connection, and save it.
+ connectionObject = new ConfigurationNode("outputconnection");
+
+ child = new ConfigurationNode("name");
+ child.setValue("Kafka");
+ connectionObject.addChild(connectionObject.getChildCount(), child);
+
+ child = new ConfigurationNode("class_name");
+ child.setValue("org.apache.manifoldcf.agents.output.kafka.KafkaOutputConnector");
+ connectionObject.addChild(connectionObject.getChildCount(), child);
+
+ child = new ConfigurationNode("description");
+ child.setValue("Kafka Connection");
+ connectionObject.addChild(connectionObject.getChildCount(), child);
+
+ child = new ConfigurationNode("max_connections");
+ child.setValue("100");
+ connectionObject.addChild(connectionObject.getChildCount(), child);
+
+ child = new ConfigurationNode("configuration");
+
+ //Kafka Output Connector parameters
+ //IP
+ ConfigurationNode ip = new ConfigurationNode("_PARAMETER_");
+ ip.setAttribute("name", "ip");
+ ip.setValue("localhost");
+ child.addChild(child.getChildCount(), ip);
+ //port
+ ConfigurationNode port = new ConfigurationNode("_PARAMETER_");
+ port.setAttribute("name", "port");
+ port.setValue("9092");
+ child.addChild(child.getChildCount(), port);
+ //topic
+ ConfigurationNode topic = new ConfigurationNode("_PARAMETER_");
+ topic.setAttribute("name", "topic");
+ topic.setValue("topic");
+ child.addChild(child.getChildCount(), topic);
+ connectionObject.addChild(connectionObject.getChildCount(), child);
+ requestObject = new Configuration();
+ requestObject.addChild(0, connectionObject);
+ result = performAPIPutOperationViaNodes("outputconnections/Kafka", 201, requestObject);
+ i = 0;
+ while (i < result.getChildCount()) {
+ ConfigurationNode resultNode = result.findChild(i++);
+ if (resultNode.getType().equals("error")) {
+ throw new Exception(resultNode.getValue());
+ }
+ }
+ // Create a job.
+ ConfigurationNode jobObject = new ConfigurationNode("job");
+ child = new ConfigurationNode("description");
+ child.setValue("Test Job");
+ jobObject.addChild(jobObject.getChildCount(), child);
+
+ child = new ConfigurationNode("repository_connection");
+ child.setValue("Test Connection");
+ jobObject.addChild(jobObject.getChildCount(), child);
+
+ // Revamped way of adding output connection
+ child = new ConfigurationNode("pipelinestage");
+ ConfigurationNode pipelineChild = new ConfigurationNode("stage_id");
+ pipelineChild.setValue("0");
+ child.addChild(child.getChildCount(), pipelineChild);
+ pipelineChild = new ConfigurationNode("stage_isoutput");
+ pipelineChild.setValue("true");
+ child.addChild(child.getChildCount(), pipelineChild);
+ pipelineChild = new ConfigurationNode("stage_connectionname");
+ pipelineChild.setValue("Kafka");
+ child.addChild(child.getChildCount(), pipelineChild);
+ jobObject.addChild(jobObject.getChildCount(), child);
+ child = new ConfigurationNode("run_mode");
+ child.setValue("scan once");
+ jobObject.addChild(jobObject.getChildCount(), child);
+ child = new ConfigurationNode("start_mode");
+ child.setValue("manual");
+ jobObject.addChild(jobObject.getChildCount(), child);
+ child = new ConfigurationNode("hopcount_mode");
+ child.setValue("accurate");
+ jobObject.addChild(jobObject.getChildCount(), child);
+
+ child = new ConfigurationNode("document_specification");
+
+ jobObject.addChild(jobObject.getChildCount(), child);
+ requestObject = new Configuration();
+ requestObject.addChild(0, jobObject);
+ result = performAPIPostOperationViaNodes("jobs", 201, requestObject);
+ String jobIDString = null;
+ i = 0;
+ while (i < result.getChildCount()) {
+ ConfigurationNode resultNode = result.findChild(i++);
+ if (resultNode.getType().equals("error")) {
+ throw new Exception(resultNode.getValue());
+ } else if (resultNode.getType().equals("job_id")) {
+ jobIDString = resultNode.getValue();
+ }
+ }
+ if (jobIDString == null) {
+ throw new Exception("Missing job_id from return!");
+ }
+ // Now, start the job, and wait until it completes.
+ startJob(jobIDString);
+ waitJobInactive(jobIDString, 120000L);
+
+ // Check to be sure we actually processed the right number of documents.
+ // The test data area has 3 documents and one directory, and we have to count the root directory too.
+ long count;
+ count = getJobDocumentsProcessed(jobIDString);
+ if (count != 3) {
+ throw new ManifoldCFException("Wrong number of documents processed - expected 3, saw " + new Long(count).toString());
+ }
+
+ // Now, delete the job.
+ deleteJob(jobIDString);
+
+ waitJobDeleted(jobIDString, 120000L);
+
+ // Cleanup is automatic by the base class, so we can feel free to leave jobs and connections lying around.
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw e;
+ }
+ }
+
+ protected void startJob(String jobIDString)
+ throws Exception {
+ Configuration requestObject = new Configuration();
+
+ Configuration result = performAPIPutOperationViaNodes("start/" + jobIDString, 201, requestObject);
+ int i = 0;
+ while (i < result.getChildCount()) {
+ ConfigurationNode resultNode = result.findChild(i++);
+ if (resultNode.getType().equals("error")) {
+ throw new Exception(resultNode.getValue());
+ }
+ }
+ }
+
+ protected void deleteJob(String jobIDString)
+ throws Exception {
+ Configuration result = performAPIDeleteOperationViaNodes("jobs/" + jobIDString, 200);
+ int i = 0;
+ while (i < result.getChildCount()) {
+ ConfigurationNode resultNode = result.findChild(i++);
+ if (resultNode.getType().equals("error")) {
+ throw new Exception(resultNode.getValue());
+ }
+ }
+
+ }
+
+ protected String getJobStatus(String jobIDString)
+ throws Exception {
+ Configuration result = performAPIGetOperationViaNodes("jobstatuses/" + jobIDString, 200);
+ String status = null;
+ int i = 0;
+ while (i < result.getChildCount()) {
+ ConfigurationNode resultNode = result.findChild(i++);
+ if (resultNode.getType().equals("error")) {
+ throw new Exception(resultNode.getValue());
+ } else if (resultNode.getType().equals("jobstatus")) {
+ int j = 0;
+ while (j < resultNode.getChildCount()) {
+ ConfigurationNode childNode = resultNode.findChild(j++);
+ if (childNode.getType().equals("status")) {
+ status = childNode.getValue();
+ }
+ }
+ }
+ }
+ return status;
+ }
+
+ protected long getJobDocumentsProcessed(String jobIDString)
+ throws Exception {
+ Configuration result = performAPIGetOperationViaNodes("jobstatuses/" + jobIDString, 200);
+ String documentsProcessed = null;
+ int i = 0;
+ while (i < result.getChildCount()) {
+ ConfigurationNode resultNode = result.findChild(i++);
+ if (resultNode.getType().equals("error")) {
+ throw new Exception(resultNode.getValue());
+ } else if (resultNode.getType().equals("jobstatus")) {
+ int j = 0;
+ while (j < resultNode.getChildCount()) {
+ ConfigurationNode childNode = resultNode.findChild(j++);
+ if (childNode.getType().equals("documents_processed")) {
+ documentsProcessed = childNode.getValue();
+ }
+ }
+ }
+ }
+ if (documentsProcessed == null) {
+ throw new Exception("Expected a documents_processed field, didn't find it");
+ }
+ return new Long(documentsProcessed).longValue();
+ }
+
+ protected void waitJobInactive(String jobIDString, long maxTime)
+ throws Exception {
+ long startTime = System.currentTimeMillis();
+ while (System.currentTimeMillis() < startTime + maxTime) {
+ String status = getJobStatus(jobIDString);
+ if (status == null) {
+ throw new Exception("No such job: '" + jobIDString + "'");
+ }
+ if (status.equals("not yet run")) {
+ throw new Exception("Job was never started.");
+ }
+ if (status.equals("done")) {
+ return;
+ }
+ if (status.equals("error")) {
+ throw new Exception("Job reports error.");
+ }
+ ManifoldCF.sleep(1000L);
+ continue;
+ }
+ throw new ManifoldCFException("ManifoldCF did not terminate in the allotted time of " + new Long(maxTime).toString() + " milliseconds");
+ }
+
+ protected void waitJobDeleted(String jobIDString, long maxTime)
+ throws Exception {
+ long startTime = System.currentTimeMillis();
+ while (System.currentTimeMillis() < startTime + maxTime) {
+ String status = getJobStatus(jobIDString);
+ if (status == null) {
+ return;
+ }
+ ManifoldCF.sleep(1000L);
+ }
+ throw new ManifoldCFException("ManifoldCF did not delete in the allotted time of " + new Long(maxTime).toString() + " milliseconds");
+ }
+
+}
Added: manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/BaseHSQLDB.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/BaseHSQLDB.java?rev=1708553&view=auto
==============================================================================
--- manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/BaseHSQLDB.java (added)
+++ manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/BaseHSQLDB.java Wed Oct 14 07:18:37 2015
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.manifoldcf.agents.output.kafka;
+
+/**
+ * This is a testing base class that is responsible for setting up/tearing down
+ * the agents framework.
+ */
+public class BaseHSQLDB extends org.apache.manifoldcf.crawler.tests.ConnectorBaseHSQLDB {
+
+ protected String[] getOutputNames() {
+ return new String[]{"Kafka"};
+ }
+
+ protected String[] getOutputClasses() {
+ return new String[]{"org.apache.manifoldcf.agents.output.kafka.KafkaOutputConnector"};
+ }
+
+}
Added: manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/BaseITHSQLDB.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/BaseITHSQLDB.java?rev=1708553&view=auto
==============================================================================
--- manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/BaseITHSQLDB.java (added)
+++ manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/BaseITHSQLDB.java Wed Oct 14 07:18:37 2015
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.manifoldcf.agents.output.kafka;
+
+import java.util.Properties;
+import org.junit.After;
+import static org.junit.Assert.fail;
+import org.junit.Before;
+
+public class BaseITHSQLDB extends org.apache.manifoldcf.crawler.tests.BaseITHSQLDB {
+
+ static KafkaLocal kafka;
+
+ protected String[] getConnectorNames() {
+ return new String[]{"CMIS"};
+ }
+
+ protected String[] getConnectorClasses() {
+ return new String[]{"org.apache.manifoldcf.crawler.tests.TestingRepositoryConnector"};
+ }
+
+ protected String[] getOutputNames() {
+ return new String[]{"Kafka"};
+ }
+
+ protected String[] getOutputClasses() {
+ return new String[]{"org.apache.manifoldcf.agents.output.kafka.KafkaOutputConnector"};
+ }
+
+ @Before
+ public void setupKafka()
+ throws Exception {
+ Properties kafkaProperties = new Properties();
+ Properties zkProperties = new Properties();
+
+ try {
+ //load properties
+ kafkaProperties.put("broker.id", "0");
+ kafkaProperties.put("port", "9092");
+ kafkaProperties.put("num.network.threads", "3");
+ kafkaProperties.put("num.io.threads", "8");
+ kafkaProperties.put("socket.send.buffer.bytes", "102400");
+ kafkaProperties.put("socket.receive.buffer.bytes", "102400");
+ kafkaProperties.put("socket.request.max.bytes", "104857600");
+ kafkaProperties.put("log.dirs", "/tmp/kafka-logs");
+ kafkaProperties.put("num.partitions", "1");
+ kafkaProperties.put("num.recovery.threads.per.data.dir", "1");
+ kafkaProperties.put("log.retention.hours", "168");
+ kafkaProperties.put("log.segment.bytes", "1073741824");
+ kafkaProperties.put("log.retention.check.interval.ms", "300000");
+ kafkaProperties.put("log.cleaner.enable", "false");
+ kafkaProperties.put("zookeeper.connect", "localhost:2181");
+ kafkaProperties.put("zookeeper.connection.timeout.ms", "6000");
+
+ zkProperties.put("dataDir", "/tmp/zookeeper");
+ zkProperties.put("clientPort", "2181");
+ zkProperties.put("maxClientCnxns", "0");
+
+ //kafkaProperties.load(Class.class.getResourceAsStream("/kafkalocal.properties"));
+ //zkProperties.load(Class.class.getResourceAsStream("/zklocal.properties"));
+ System.out.println("Kafka is starting...");
+
+ //start kafka
+ kafka = new KafkaLocal(kafkaProperties, zkProperties);
+ Thread.sleep(5000);
+ } catch (Exception e) {
+ e.printStackTrace(System.out);
+ fail("Error running local Kafka broker");
+ e.printStackTrace(System.out);
+ }
+ }
+
+ @After
+ public void cleanUpKafka() {
+ kafka.stop();
+ }
+
+}
Added: manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/BasePostgresql.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/BasePostgresql.java?rev=1708553&view=auto
==============================================================================
--- manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/BasePostgresql.java (added)
+++ manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/BasePostgresql.java Wed Oct 14 07:18:37 2015
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.manifoldcf.agents.output.kafka;
+
+public class BasePostgresql extends org.apache.manifoldcf.crawler.tests.ConnectorBasePostgresql {
+
+ protected String[] getOutputNames() {
+ return new String[]{"Kafka"};
+ }
+
+ protected String[] getOutputClasses() {
+ return new String[]{"org.apache.manifoldcf.agents.output.kafka.KafkaOutputConnector"};
+ }
+
+}
Added: manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/BaseUIHSQLDB.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/BaseUIHSQLDB.java?rev=1708553&view=auto
==============================================================================
--- manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/BaseUIHSQLDB.java (added)
+++ manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/BaseUIHSQLDB.java Wed Oct 14 07:18:37 2015
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.manifoldcf.agents.output.kafka;
+
+public class BaseUIHSQLDB extends org.apache.manifoldcf.crawler.tests.ConnectorBaseUIHSQLDB {
+
+ protected String[] getConnectorNames() {
+ return new String[]{"Test Connector"};
+ }
+
+ protected String[] getConnectorClasses() {
+ return new String[]{"org.apache.manifoldcf.crawler.tests.TestingRepositoryConnector"};
+ }
+
+ protected String[] getOutputNames() {
+ return new String[]{"Kafka"};
+ }
+
+ protected String[] getOutputClasses() {
+ return new String[]{"org.apache.manifoldcf.agents.output.kafka.KafkaOutputConnector"};
+ }
+
+}
Added: manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/KafkaConnectorTest.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/KafkaConnectorTest.java?rev=1708553&view=auto
==============================================================================
--- manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/KafkaConnectorTest.java (added)
+++ manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/KafkaConnectorTest.java Wed Oct 14 07:18:37 2015
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.manifoldcf.agents.output.kafka;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.manifoldcf.agents.interfaces.IOutputAddActivity;
+import org.apache.commons.lang3.concurrent.ConcurrentUtils;
+
+import org.apache.manifoldcf.agents.interfaces.RepositoryDocument;
+import org.apache.manifoldcf.core.interfaces.VersionContext;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class KafkaConnectorTest {
+
+ @Mock
+ private KafkaProducer producer;
+
+ private KafkaOutputConnector connector;
+
+ @Before
+ public void setup() throws Exception {
+ connector = new KafkaOutputConnector();
+ connector.setProducer(producer);
+
+ when(producer.send(Mockito.any(ProducerRecord.class))).thenReturn(ConcurrentUtils.constantFuture(true));
+ }
+
+ @Test
+ public void whenSendingDocumenttoKafka() throws Exception {
+ RepositoryDocument document;
+
+ document = new RepositoryDocument();
+
+ document.setMimeType("text\'/plain");
+ document.setFileName("test.txt");
+
+ KafkaMessage kafkaMessage = new KafkaMessage();
+ byte[] finalString = kafkaMessage.createJSON(document);
+
+ IOutputAddActivity activities = mock(IOutputAddActivity.class);
+ VersionContext version = mock(VersionContext.class);
+ //ProducerRecord record = new ProducerRecord("topic", finalString);
+
+ connector.addOrReplaceDocumentWithException("", version, document, "", activities);
+ verify(producer).send(Mockito.any(ProducerRecord.class));
+ }
+}
Added: manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/KafkaLocal.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/KafkaLocal.java?rev=1708553&view=auto
==============================================================================
--- manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/KafkaLocal.java (added)
+++ manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/KafkaLocal.java Wed Oct 14 07:18:37 2015
@@ -0,0 +1,36 @@
+package org.apache.manifoldcf.agents.output.kafka;
+
+import java.io.IOException;
+import java.util.Properties;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServerStartable;
+
+public class KafkaLocal {
+
+ public KafkaServerStartable kafka;
+ public ZooKeeperLocal zookeeper;
+
+ public KafkaLocal(Properties kafkaProperties, Properties zkProperties) throws IOException, InterruptedException {
+ KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties);
+
+ //start local zookeeper
+ System.out.println("starting local zookeeper...");
+ zookeeper = new ZooKeeperLocal(zkProperties);
+ System.out.println("done");
+
+ //start local kafka broker
+ kafka = new KafkaServerStartable(kafkaConfig);
+ System.out.println("starting local kafka broker...");
+
+ kafka.startup();
+ System.out.println("done");
+ }
+
+ public void stop() {
+ //stop kafka broker
+ System.out.println("stopping kafka...");
+ kafka.shutdown();
+ System.out.println("done");
+ }
+
+}