You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2015/10/14 09:18:38 UTC

svn commit: r1708553 [1/2] - in /manifoldcf/branches/CONNECTORS-1162: ./ connectors/ connectors/kafka/ connectors/kafka/connector/ connectors/kafka/connector/src/ connectors/kafka/connector/src/main/ connectors/kafka/connector/src/main/java/ connectors...

Author: kwright
Date: Wed Oct 14 07:18:37 2015
New Revision: 1708553

URL: http://svn.apache.org/viewvc?rev=1708553&view=rev
Log:
Import kafka connector (except for a few wrong changes) from github

Added:
    manifoldcf/branches/CONNECTORS-1162/connectors/kafka/
    manifoldcf/branches/CONNECTORS-1162/connectors/kafka/build.xml
    manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/
    manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/
    manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/
    manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/java/
    manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/java/org/
    manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/java/org/apache/
    manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/java/org/apache/manifoldcf/
    manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/java/org/apache/manifoldcf/agents/
    manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/java/org/apache/manifoldcf/agents/output/
    manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/java/org/apache/manifoldcf/agents/output/kafka/
    manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/java/org/apache/manifoldcf/agents/output/kafka/KafkaConfig.java
    manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/java/org/apache/manifoldcf/agents/output/kafka/KafkaMessage.java
    manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/java/org/apache/manifoldcf/agents/output/kafka/KafkaOutputConnector.java
    manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/java/org/apache/manifoldcf/agents/output/kafka/Messages.java
    manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/native2ascii/
    manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/native2ascii/org/
    manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/native2ascii/org/apache/
    manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/native2ascii/org/apache/manifoldcf/
    manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/native2ascii/org/apache/manifoldcf/agents/
    manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/native2ascii/org/apache/manifoldcf/agents/output/
    manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/native2ascii/org/apache/manifoldcf/agents/output/kafka/
    manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/native2ascii/org/apache/manifoldcf/agents/output/kafka/common_en_US.properties
    manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/native2ascii/org/apache/manifoldcf/agents/output/kafka/common_ja_JP.properties
    manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/native2ascii/org/apache/manifoldcf/agents/output/kafka/common_zh_CN.properties
    manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/resources/
    manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/resources/org/
    manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/resources/org/apache/
    manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/resources/org/apache/manifoldcf/
    manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/resources/org/apache/manifoldcf/agents/
    manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/resources/org/apache/manifoldcf/agents/output/
    manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/resources/org/apache/manifoldcf/agents/output/kafka/
    manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/resources/org/apache/manifoldcf/agents/output/kafka/editConfiguration.js
    manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/resources/org/apache/manifoldcf/agents/output/kafka/editConfiguration_Parameters.html
    manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/resources/org/apache/manifoldcf/agents/output/kafka/viewConfiguration.html
    manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/
    manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/
    manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/
    manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/
    manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/
    manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/
    manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/
    manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/
    manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/APISanityHSQLDBIT.java
    manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/BaseHSQLDB.java
    manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/BaseITHSQLDB.java
    manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/BasePostgresql.java
    manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/BaseUIHSQLDB.java
    manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/KafkaConnectorTest.java
    manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/KafkaLocal.java
    manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/NavigationHSQLDBUI.java
    manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/ZooKeeperLocal.java
    manifoldcf/branches/CONNECTORS-1162/connectors/kafka/pom.xml
Modified:
    manifoldcf/branches/CONNECTORS-1162/build.xml
    manifoldcf/branches/CONNECTORS-1162/connectors/pom.xml
    manifoldcf/branches/CONNECTORS-1162/framework/build.xml

Modified: manifoldcf/branches/CONNECTORS-1162/build.xml
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1162/build.xml?rev=1708553&r1=1708552&r2=1708553&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-1162/build.xml (original)
+++ manifoldcf/branches/CONNECTORS-1162/build.xml Wed Oct 14 07:18:37 2015
@@ -1692,6 +1692,66 @@ Use Apache Forrest version forrest-0.9-d
             <fileset dir="build/download/apache-manifoldcf-elasticsearch-1.5-plugin-bin/elasticsearch-1.5-plugin-mcf-2.0"/>
         </copy>
     </target>
+	
+    <target name="download-kafka-client">
+        <mkdir dir="lib"/>
+        <antcall target="download-via-maven">
+            <param name="target" value="lib"/>
+            <param name="project-path" value="org/apache/kafka"/>
+            <param name="artifact-version" value="0.8.2.1"/>
+            <param name="artifact-name" value="kafka-clients"/>
+            <param name="artifact-type" value="jar"/>
+        </antcall>
+        <antcall target="download-via-maven">
+            <param name="target" value="lib"/>
+            <param name="project-path" value="org/apache/kafka"/>
+            <param name="artifact-version" value="0.8.2.1"/>
+            <param name="artifact-name" value="kafka_2.11"/>
+            <param name="artifact-type" value="jar"/>
+        </antcall>
+        <antcall target="download-via-maven">
+            <param name="target" value="lib"/>
+            <param name="project-path" value="org/apache/commons"/>
+            <param name="artifact-version" value="3.4"/>
+            <param name="artifact-name" value="commons-lang3"/>
+            <param name="artifact-type" value="jar"/>
+        </antcall>
+        <antcall target="download-via-maven">
+            <param name="target" value="lib"/>
+            <param name="project-path" value="com/yammer/metrics"/>
+            <param name="artifact-version" value="2.2.0"/>
+            <param name="artifact-name" value="metrics-core"/>
+            <param name="artifact-type" value="jar"/>
+        </antcall>
+        <antcall target="download-via-maven">
+            <param name="target" value="lib"/>
+            <param name="project-path" value="org/scala-lang"/>
+            <param name="artifact-version" value="2.11.0"/>
+            <param name="artifact-name" value="scala-library"/>
+            <param name="artifact-type" value="jar"/>
+        </antcall>
+        <antcall target="download-via-maven">
+            <param name="target" value="lib"/>
+            <param name="project-path" value="net/sf/jopt-simple"/>
+            <param name="artifact-version" value="3.2"/>
+            <param name="artifact-name" value="jopt-simple"/>
+            <param name="artifact-type" value="jar"/>
+        </antcall>
+        <antcall target="download-via-maven">
+            <param name="target" value="lib"/>
+            <param name="project-path" value="org/scala-lang/modules"/>
+            <param name="artifact-version" value="1.0.4"/>
+            <param name="artifact-name" value="scala-xml_2.11"/>
+            <param name="artifact-type" value="jar"/>
+        </antcall>
+        <antcall target="download-via-maven">
+            <param name="target" value="lib"/>
+            <param name="project-path" value="org/scala-lang/modules"/>
+            <param name="artifact-version" value="1.0.4"/>
+            <param name="artifact-name" value="scala-parser-combinators_2.11"/>
+            <param name="artifact-type" value="jar"/>
+        </antcall>
+    </target>
 
    <target name="download-dropbox-client">
         <mkdir dir="lib"/>
@@ -2298,7 +2358,7 @@ Use Apache Forrest version forrest-0.9-d
         </antcall>
     </target>
   
-    <target name="make-core-deps" depends="download-joda-time,download-aws-sdk,download-resteasy,download-jsoup,download-mockito,download-alfresco-webscript-plugin,download-alfresco-indexer-client,download-mongo-java-driver,download-jira-client,download-google-api-client,download-dropbox-client,download-solrj,download-zookeeper,download-httpcomponents,download-json,download-hsqldb,download-xerces,download-commons,download-elasticsearch-plugin,download-solr-plugins,download-sharepoint-plugins,download-jstl,download-xmlgraphics-commons,download-woodstox,download-xmlsec,download-xml-apis,download-wss4j,download-velocity,download-streambuffer,download-stax,download-servlet-api,download-xml-resolver,download-osgi,download-opensaml,download-mimepull,download-mail,download-log4j,download-junit,download-jaxws,download-glassfish,download-jaxb,download-tomcat,download-h2,download-h2-support,download-geronimo-specs,download-fop,download-postgresql,download-axis,download-saaj,download-wsdl4j,do
 wnload-castor,download-jetty,download-slf4j,download-xalan,download-activation,download-avalon-framework,download-poi,download-chemistry,download-ecj,download-hadoop,download-htrace,download-protobuf,download-tika,download-jackson">
+    <target name="make-core-deps" depends="download-kafka-client,download-joda-time,download-aws-sdk,download-resteasy,download-jsoup,download-mockito,download-alfresco-webscript-plugin,download-alfresco-indexer-client,download-mongo-java-driver,download-jira-client,download-google-api-client,download-dropbox-client,download-solrj,download-zookeeper,download-httpcomponents,download-json,download-hsqldb,download-xerces,download-commons,download-elasticsearch-plugin,download-solr-plugins,download-sharepoint-plugins,download-jstl,download-xmlgraphics-commons,download-woodstox,download-xmlsec,download-xml-apis,download-wss4j,download-velocity,download-streambuffer,download-stax,download-servlet-api,download-xml-resolver,download-osgi,download-opensaml,download-mimepull,download-mail,download-log4j,download-junit,download-jaxws,download-glassfish,download-jaxb,download-tomcat,download-h2,download-h2-support,download-geronimo-specs,download-fop,download-postgresql,download-axis,download-s
 aaj,download-wsdl4j,download-castor,download-jetty,download-slf4j,download-xalan,download-activation,download-avalon-framework,download-poi,download-chemistry,download-ecj,download-hadoop,download-htrace,download-protobuf,download-tika,download-jackson">
         <copy todir="lib">
             <fileset dir="lib-license" includes="*.txt"/>
         </copy>
@@ -2354,4 +2414,4 @@ Use Apache Forrest version forrest-0.9-d
         <ant dir="site" target="download-cleanup"/>
     </target>
     
-</project>
+</project>
\ No newline at end of file

Added: manifoldcf/branches/CONNECTORS-1162/connectors/kafka/build.xml
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1162/connectors/kafka/build.xml?rev=1708553&view=auto
==============================================================================
--- manifoldcf/branches/CONNECTORS-1162/connectors/kafka/build.xml (added)
+++ manifoldcf/branches/CONNECTORS-1162/connectors/kafka/build.xml Wed Oct 14 07:18:37 2015
@@ -0,0 +1,48 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<project name="kafka" default="all">
+	<presetdef name="javac">
+    <javac includeantruntime="false" />
+  </presetdef>
+
+    <property environment="env"/>
+    <condition property="mcf-dist" value="${env.MCFDISTPATH}">
+        <isset property="env.MCFDISTPATH"/>
+    </condition>
+    <property name="abs-dist" location="../../dist"/>
+    <condition property="mcf-dist" value="${abs-dist}">
+        <not>
+            <isset property="env.MCFDISTPATH"/>
+        </not>
+    </condition>
+
+    <import file="${mcf-dist}/connector-build.xml"/>
+
+    <target name="deliver-connector" depends="mcf-connector-build.deliver-connector">
+        <antcall target="general-add-output-connector">
+            <param name="connector-label" value="Kafka"/>
+            <param name="connector-class" value="org.apache.manifoldcf.agents.output.kafka.KafkaOutputConnector"/>
+        </antcall>
+    </target>
+	
+    <path id="connector-test-classpath">
+        <path refid="mcf-connector-build.connector-test-classpath"/>
+        <fileset file="../../lib/*.jar"/>
+    </path>
+
+</project>

Added: manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/java/org/apache/manifoldcf/agents/output/kafka/KafkaConfig.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/java/org/apache/manifoldcf/agents/output/kafka/KafkaConfig.java?rev=1708553&view=auto
==============================================================================
--- manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/java/org/apache/manifoldcf/agents/output/kafka/KafkaConfig.java (added)
+++ manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/java/org/apache/manifoldcf/agents/output/kafka/KafkaConfig.java Wed Oct 14 07:18:37 2015
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.manifoldcf.agents.output.kafka;
+
+public class KafkaConfig {
+
+  // Configuration parameters
+  public static final String IP = "ip";
+  public static final String PORT = "port";
+  public static final String TOPIC = "topic";
+
+  public static final String IP_DEFAULT = "localhost";
+  public static final String PORT_DEFAULT = "9092";
+  public static final String TOPIC_DEFAULT = "topic";
+}

Added: manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/java/org/apache/manifoldcf/agents/output/kafka/KafkaMessage.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/java/org/apache/manifoldcf/agents/output/kafka/KafkaMessage.java?rev=1708553&view=auto
==============================================================================
--- manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/java/org/apache/manifoldcf/agents/output/kafka/KafkaMessage.java (added)
+++ manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/java/org/apache/manifoldcf/agents/output/kafka/KafkaMessage.java Wed Oct 14 07:18:37 2015
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.manifoldcf.agents.output.kafka;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.Iterator;
+import org.apache.commons.io.IOUtils;
+import org.apache.manifoldcf.agents.interfaces.RepositoryDocument;
+import static org.apache.manifoldcf.agents.output.kafka.KafkaOutputConnector.allowAttributeName;
+import static org.apache.manifoldcf.agents.output.kafka.KafkaOutputConnector.denyAttributeName;
+import static org.apache.manifoldcf.agents.output.kafka.KafkaOutputConnector.noSecurityToken;
+import static org.apache.manifoldcf.agents.output.kafka.KafkaOutputConnector.useNullValue;
+import org.apache.manifoldcf.core.common.Base64;
+
+/**
+ *
+ * @author tugba
+ */
+public class KafkaMessage {
+
+  private final String[] acls = null;
+  private final String[] denyAcls = null;
+  private final String[] shareAcls = null;
+  private final String[] shareDenyAcls = null;
+  private final String[] parentAcls = null;
+  private final String[] parentDenyAcls = null;
+  private InputStream inputStream = null;
+
+  public byte[] createJSON(RepositoryDocument document) {
+    String finalString = null;
+    // create temporaray byte array output stream
+    OutputStream out = new ByteArrayOutputStream();
+    try {
+      inputStream = document.getBinaryStream();
+
+      // print to our byte array output stream
+      PrintWriter pw = new PrintWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8));
+
+      pw.print("{");
+      Iterator<String> i = document.getFields();
+      boolean needComma = false;
+      while (i.hasNext()) {
+        String fieldName = i.next();
+        String[] fieldValues = document.getFieldAsStrings(fieldName);
+        needComma = writeField(pw, needComma, fieldName, fieldValues);
+      }
+
+      needComma = writeACLs(pw, needComma, "document", acls, denyAcls);
+      needComma = writeACLs(pw, needComma, "share", shareAcls, shareDenyAcls);
+      needComma = writeACLs(pw, needComma, "parent", parentAcls, parentDenyAcls);
+
+      if (inputStream != null) {
+        if (needComma) {
+          pw.print(",");
+        }
+        // I'm told this is not necessary: see CONNECTORS-690
+        //pw.print("\"type\" : \"attachment\",");
+        pw.print("\"file\" : {");
+        String contentType = document.getMimeType();
+        if (contentType != null) {
+          pw.print("\"_content_type\" : " + jsonStringEscape(contentType) + ",");
+        }
+        String fileName = document.getFileName();
+        if (fileName != null) {
+          pw.print("\"_name\" : " + jsonStringEscape(fileName) + ",");
+        }
+        // Since ES 1.0
+        pw.print(" \"_content\" : \"");
+        Base64 base64 = new Base64();
+        base64.encodeStream(inputStream, pw);
+        pw.print("\"}");
+      }
+      pw.print("}");
+      pw.flush();
+      IOUtils.closeQuietly(pw);
+      finalString = new String(((ByteArrayOutputStream) out).toByteArray());
+      //System.out.println("FINAL: " + finalString);
+    } catch (Exception e) {
+      e.printStackTrace();
+      // throw new IOException(e.getMessage());
+    }
+    return ((ByteArrayOutputStream) out).toByteArray();
+  }
+
+  protected static boolean writeField(PrintWriter pw, boolean needComma,
+          String fieldName, String[] fieldValues)
+          throws IOException {
+    if (fieldValues == null) {
+      return needComma;
+    }
+
+    if (fieldValues.length == 1) {
+      if (needComma) {
+        pw.print(",");
+      }
+      pw.print(jsonStringEscape(fieldName) + " : " + jsonStringEscape(fieldValues[0]));
+      needComma = true;
+      return needComma;
+    }
+
+    if (fieldValues.length > 1) {
+      if (needComma) {
+        pw.print(",");
+      }
+      StringBuilder sb = new StringBuilder();
+      sb.append("[");
+      for (int j = 0; j < fieldValues.length; j++) {
+        sb.append(jsonStringEscape(fieldValues[j])).append(",");
+      }
+      sb.setLength(sb.length() - 1); // discard last ","
+      sb.append("]");
+      pw.print(jsonStringEscape(fieldName) + " : " + sb.toString());
+      needComma = true;
+    }
+    return needComma;
+  }
+
+  /**
+   * Output an acl level
+   */
+  protected static boolean writeACLs(PrintWriter pw, boolean needComma,
+          String aclType, String[] acl, String[] denyAcl)
+          throws IOException {
+    String metadataACLName = allowAttributeName + aclType;
+    if (acl != null && acl.length > 0) {
+      needComma = writeField(pw, needComma, metadataACLName, acl);
+    } else if (!useNullValue) {
+      needComma = writeField(pw, needComma, metadataACLName, new String[]{noSecurityToken});
+    }
+    String metadataDenyACLName = denyAttributeName + aclType;
+    if (denyAcl != null && denyAcl.length > 0) {
+      needComma = writeField(pw, needComma, metadataDenyACLName, denyAcl);
+    } else if (!useNullValue) {
+      needComma = writeField(pw, needComma, metadataDenyACLName, new String[]{noSecurityToken});
+    }
+    return needComma;
+  }
+
+  protected static String jsonStringEscape(String value) {
+    StringBuilder sb = new StringBuilder("\"");
+    for (int i = 0; i < value.length(); i++) {
+      char x = value.charAt(i);
+      if (x == '\n') {
+        sb.append('\\').append('n');
+      } else if (x == '\r') {
+        sb.append('\\').append('r');
+      } else if (x == '\t') {
+        sb.append('\\').append('t');
+      } else if (x == '\b') {
+        sb.append('\\').append('b');
+      } else if (x == '\f') {
+        sb.append('\\').append('f');
+      } else {
+        if (x == '\"' || x == '\\' || x == '/') {
+          sb.append('\\');
+        }
+        sb.append(x);
+      }
+    }
+    sb.append("\"");
+    return sb.toString();
+  }
+}

Added: manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/java/org/apache/manifoldcf/agents/output/kafka/KafkaOutputConnector.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/java/org/apache/manifoldcf/agents/output/kafka/KafkaOutputConnector.java?rev=1708553&view=auto
==============================================================================
--- manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/java/org/apache/manifoldcf/agents/output/kafka/KafkaOutputConnector.java (added)
+++ manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/java/org/apache/manifoldcf/agents/output/kafka/KafkaOutputConnector.java Wed Oct 14 07:18:37 2015
@@ -0,0 +1,364 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.manifoldcf.agents.output.kafka;
+
+import org.apache.manifoldcf.core.interfaces.*;
+import org.apache.manifoldcf.agents.interfaces.*;
+
+import java.util.*;
+import java.io.*;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.PartitionInfo;
+
+/**
+ * This is a kafka output connector.
+ */
+public class KafkaOutputConnector extends org.apache.manifoldcf.agents.output.BaseOutputConnector {
+
+  public static final String _rcsid = "@(#)$Id: KafkaOutputConnector.java 988245 2010-08-23 18:39:35Z kwright $";
+
+  // Activities we log
+  /**
+   * Ingestion activity
+   */
+  public final static String INGEST_ACTIVITY = "document ingest";
+
+  /**
+   * Job notify activity
+   */
+  public final static String JOB_COMPLETE_ACTIVITY = "output notification";
+
+  private final static String KAFKA_TAB_PARAMETERS = "KafkaConnector.Parameters";
+
+  /**
+   * Forward to the javascript to check the configuration parameters
+   */
+  private static final String EDIT_CONFIG_HEADER_FORWARD = "editConfiguration.js";
+
+  /**
+   * Forward to the HTML template to edit the configuration parameters
+   */
+  private static final String EDIT_CONFIG_FORWARD_PARAMETERS = "editConfiguration_Parameters.html";
+
+  /**
+   * Forward to the HTML template to view the configuration parameters
+   */
+  private static final String VIEW_CONFIG_FORWARD = "viewConfiguration.html";
+
+  /**
+   * cloudsearch field name for file body text.
+   */
+  private static final String FILE_BODY_TEXT_FIELDNAME = "f_bodytext";
+
+  /**
+   * Field name we use for document's URI.
+   */
+  private static final String DOCUMENT_URI_FIELDNAME = "document_URI";
+
+  /**
+   * The allow attribute name
+   */
+  protected final static String allowAttributeName = "allow_token_";
+  /**
+   * The deny attribute name
+   */
+  protected final static String denyAttributeName = "deny_token_";
+  /**
+   * The no-security token
+   */
+  protected final static String noSecurityToken = "__nosecurity__";
+
+  protected final static boolean useNullValue = false;
+
+  KafkaProducer producer = null;
+
+  /**
+   * Constructor.
+   */
+  public KafkaOutputConnector() {
+  }
+
+  public void setProducer(KafkaProducer producer) {
+    this.producer = producer;
+  }
+
+  /**
+   * Return the list of activities that this connector supports (i.e. writes
+   * into the log).
+   *
+   * @return the list.
+   */
+  @Override
+  public String[] getActivitiesList() {
+    return new String[]{INGEST_ACTIVITY, JOB_COMPLETE_ACTIVITY};
+  }
+
+  /**
+   * Connect.
+   *
+   * @param configParameters is the set of configuration parameters, which in
+   * this case describe the target appliance, basic auth configuration, etc.
+   * (This formerly came out of the ini file.)
+   */
+  @Override
+  public void connect(ConfigParams configParameters) {
+    super.connect(configParameters);
+
+    Properties props = new Properties();
+    String IP = params.getParameter(KafkaConfig.IP);
+    String PORT = params.getParameter(KafkaConfig.PORT);
+    //System.out.println("Kafka IP: " + IP);
+    //System.out.println("Kafka Port: " + PORT);
+    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, IP + ":" + PORT);
+    props.put(ProducerConfig.RETRIES_CONFIG, "3");
+    props.put(ProducerConfig.ACKS_CONFIG, "all");
+    props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none");
+    props.put(ProducerConfig.BATCH_SIZE_CONFIG, 200);
+    props.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, true);
+    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
+    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+    producer = new KafkaProducer(props);
+  }
+
+  /**
+   * Close the connection. Call this before discarding the connection.
+   */
+  @Override
+  public void disconnect()
+          throws ManifoldCFException {
+    super.disconnect();
+  }
+
+  /**
+   * Fill in a Server tab configuration parameter map for calling a Velocity
+   * template.
+   *
+   * @param newMap is the map to fill in
+   * @param parameters is the current set of configuration parameters
+   */
+  private static void fillInServerConfigurationMap(Map<String, Object> newMap, IPasswordMapperActivity mapper, ConfigParams parameters) {
+    String IP = parameters.getParameter(KafkaConfig.IP);
+    String port = parameters.getParameter(KafkaConfig.PORT);
+    String topic = parameters.getParameter(KafkaConfig.TOPIC);
+
+    if (IP == null) {
+      IP = "localhost";
+    }
+    if (port == null) {
+      port = "9092";
+    }
+    if (topic == null) {
+      topic = "topic";
+    }
+
+    newMap.put("IP", IP);
+    newMap.put("PORT", port);
+    newMap.put("TOPIC", topic);
+  }
+
+  @Override
+  public void outputConfigurationHeader(IThreadContext threadContext,
+          IHTTPOutput out, Locale locale, ConfigParams parameters,
+          List<String> tabsArray) throws ManifoldCFException, IOException {
+    // Add the Server tab
+    tabsArray.add(Messages.getString(locale, KAFKA_TAB_PARAMETERS));
+    // Map the parameters
+    Map<String, Object> paramMap = new HashMap<String, Object>();
+
+    // Fill in the parameters from each tab
+    fillInServerConfigurationMap(paramMap, out, parameters);
+
+    // Output the Javascript - only one Velocity template for all tabs
+    Messages.outputResourceWithVelocity(out, locale, EDIT_CONFIG_HEADER_FORWARD, paramMap);
+
+  }
+
+  @Override
+  public void outputConfigurationBody(IThreadContext threadContext,
+          IHTTPOutput out, Locale locale, ConfigParams parameters, String tabName)
+          throws ManifoldCFException, IOException {
+
+    // Call the Velocity templates for each tab
+    Map<String, Object> paramMap = new HashMap<String, Object>();
+
+    // Set the tab name
+    paramMap.put("TABNAME", tabName);
+
+    // Fill in the parameters
+    fillInServerConfigurationMap(paramMap, out, parameters);
+
+    // Server tab
+    Messages.outputResourceWithVelocity(out, locale, EDIT_CONFIG_FORWARD_PARAMETERS, paramMap);
+  }
+
+  @Override
+  public void viewConfiguration(IThreadContext threadContext, IHTTPOutput out,
+          Locale locale, ConfigParams parameters) throws ManifoldCFException,
+          IOException {
+
+    Map<String, Object> paramMap = new HashMap<String, Object>();
+
+    // Fill in map from each tab
+    fillInServerConfigurationMap(paramMap, out, parameters);
+
+    Messages.outputResourceWithVelocity(out, locale, VIEW_CONFIG_FORWARD, paramMap);
+  }
+
+  @Override
+  public String processConfigurationPost(IThreadContext threadContext,
+          IPostParameters variableContext, ConfigParams parameters)
+          throws ManifoldCFException {
+    // Server tab parameters
+    String IP = variableContext.getParameter(KafkaConfig.IP);
+    if (IP != null) {
+      parameters.setParameter(KafkaConfig.IP, IP);
+    }
+    String port = variableContext.getParameter(KafkaConfig.PORT);
+    if (port != null) {
+      parameters.setParameter(KafkaConfig.PORT, port);
+    }
+    String topic = variableContext.getParameter(KafkaConfig.TOPIC);
+    if (topic != null) {
+      parameters.setParameter(KafkaConfig.TOPIC, topic);
+    }
+    return null;
+  }
+
+  /**
+   * Test the connection. Returns a string describing the connection integrity.
+   *
+   * @return the connection's status as a displayable string.
+   */
+  @Override
+  public String check()
+          throws ManifoldCFException {
+    try {
+      List<PartitionInfo> partitions = producer.partitionsFor(params.getParameter(KafkaConfig.TOPIC));
+      return super.check();
+    } catch (ManifoldCFException e) {
+      return "Connection failed: " + e.getMessage();
+    }
+  }
+
+  /**
+   * Get an output version string, given an output specification. The output
+   * version string is used to uniquely describe the pertinent details of the
+   * output specification and the configuration, to allow the Connector
+   * Framework to determine whether a document will need to be output again.
+   * Note that the contents of the document cannot be considered by this method,
+   * and that a different version string (defined in IRepositoryConnector) is
+   * used to describe the version of the actual document.
+   *
+   * This method presumes that the connector object has been configured, and it
+   * is thus able to communicate with the output data store should that be
+   * necessary.
+   *
+   * @param spec is the current output specification for the job that is doing
+   * the crawling.
+   * @return a string, of unlimited length, which uniquely describes output
+   * configuration and specification in such a way that if two such strings are
+   * equal, the document will not need to be sent again to the output data
+   * sstore.
+   */
+  @Override
+  public VersionContext getPipelineDescription(Specification spec)
+          throws ManifoldCFException, ServiceInterruption {
+    return new VersionContext("", params, spec);
+  }
+
+  /**
+   * Add (or replace) a document in the output data store using the connector.
+   * This method presumes that the connector object has been configured, and it
+   * is thus able to communicate with the output data store should that be
+   * necessary.
+   *
+   * @param documentURI is the URI of the document. The URI is presumed to be
+   * the unique identifier which the output data store will use to process and
+   * serve the document. This URI is constructed by the repository connector
+   * which fetches the document, and is thus universal across all output
+   * connectors.
+   * @param pipelineDescription includes the description string that was
+   * constructed for this document by the getOutputDescription() method.
+   * @param document is the document data to be processed (handed to the output
+   * data store).
+   * @param authorityNameString is the name of the authority responsible for
+   * authorizing any access tokens passed in with the repository document. May
+   * be null.
+   * @param activities is the handle to an object that the implementer of a
+   * pipeline connector may use to perform operations, such as logging
+   * processing activity, or sending a modified document to the next stage in
+   * the pipeline.
+   * @return the document status (accepted or permanently rejected).
+   * @throws IOException only if there's a stream error reading the document
+   * data.
+   */
+  @Override
+  public int addOrReplaceDocumentWithException(String documentURI, VersionContext outputDescription, RepositoryDocument document, String authorityNameString, IOutputAddActivity activities)
+          throws ManifoldCFException, ServiceInterruption, IOException {
+    //System.out.println("Starting to ingest document....");
+    try {
+      KafkaMessage kafkaMessage = new KafkaMessage();
+      // Get document info in JSON format
+      byte[] finalString = kafkaMessage.createJSON(document);
+      String topic = getConfig(params, KafkaConfig.TOPIC, "topic");
+
+      ProducerRecord record = new ProducerRecord(topic, finalString);
+      producer.send(record).get();
+    } catch (InterruptedException e) {
+      new ManifoldCFException("interrupted", ManifoldCFException.INTERRUPTED);
+    } catch (ExecutionException e) {
+      new ManifoldCFException("interrupted", ManifoldCFException.INTERRUPTED);
+    }
+
+    activities.recordActivity(null, INGEST_ACTIVITY, new Long(document.getBinaryLength()), documentURI, "OK", null);
+    return DOCUMENTSTATUS_ACCEPTED;
+  }
+
+  private static String getConfig(ConfigParams config,
+          String parameter,
+          String defaultValue) {
+    if (config == null) {
+      return defaultValue;
+    }
+    final String protocol = config.getParameter(parameter);
+    if (protocol == null) {
+      return defaultValue;
+    }
+    return protocol;
+  }
+
+  /**
+   * Notify the connector of a completed job. This is meant to allow the
+   * connector to flush any internal data structures it has been keeping around,
+   * or to tell the output repository that this is a good time to synchronize
+   * things. It is called whenever a job is either completed or aborted.
+   *
+   * @param activities is the handle to an object that the implementer of an
+   * output connector may use to perform operations, such as logging processing
+   * activity.
+   */
+  @Override
+  public void noteJobComplete(IOutputNotifyActivity activities)
+          throws ManifoldCFException, ServiceInterruption {
+    activities.recordActivity(null, JOB_COMPLETE_ACTIVITY, null, "", "OK", null);
+  }
+}

Added: manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/java/org/apache/manifoldcf/agents/output/kafka/Messages.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/java/org/apache/manifoldcf/agents/output/kafka/Messages.java?rev=1708553&view=auto
==============================================================================
--- manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/java/org/apache/manifoldcf/agents/output/kafka/Messages.java (added)
+++ manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/java/org/apache/manifoldcf/agents/output/kafka/Messages.java Wed Oct 14 07:18:37 2015
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.manifoldcf.agents.output.kafka;
+
+import java.util.Locale;
+import java.util.Map;
+import org.apache.manifoldcf.core.interfaces.ManifoldCFException;
+import org.apache.manifoldcf.core.interfaces.IHTTPOutput;
+
+public class Messages extends org.apache.manifoldcf.ui.i18n.Messages {
+
+  public static final String DEFAULT_BUNDLE_NAME = "org.apache.manifoldcf.agents.output.kafka.common";
+  public static final String DEFAULT_PATH_NAME = "org.apache.manifoldcf.agents.output.kafka";
+
+  /**
+   * Constructor - do no instantiate
+   */
+  protected Messages() {
+  }
+
+  public static String getString(Locale locale, String messageKey) {
+    return getString(DEFAULT_BUNDLE_NAME, locale, messageKey, null);
+  }
+
+  public static String getAttributeString(Locale locale, String messageKey) {
+    return getAttributeString(DEFAULT_BUNDLE_NAME, locale, messageKey, null);
+  }
+
+  public static String getBodyString(Locale locale, String messageKey) {
+    return getBodyString(DEFAULT_BUNDLE_NAME, locale, messageKey, null);
+  }
+
+  public static String getAttributeJavascriptString(Locale locale, String messageKey) {
+    return getAttributeJavascriptString(DEFAULT_BUNDLE_NAME, locale, messageKey, null);
+  }
+
+  public static String getBodyJavascriptString(Locale locale, String messageKey) {
+    return getBodyJavascriptString(DEFAULT_BUNDLE_NAME, locale, messageKey, null);
+  }
+
+  public static String getString(Locale locale, String messageKey, Object[] args) {
+    return getString(DEFAULT_BUNDLE_NAME, locale, messageKey, args);
+  }
+
+  public static String getAttributeString(Locale locale, String messageKey, Object[] args) {
+    return getAttributeString(DEFAULT_BUNDLE_NAME, locale, messageKey, args);
+  }
+
+  public static String getBodyString(Locale locale, String messageKey, Object[] args) {
+    return getBodyString(DEFAULT_BUNDLE_NAME, locale, messageKey, args);
+  }
+
+  public static String getAttributeJavascriptString(Locale locale, String messageKey, Object[] args) {
+    return getAttributeJavascriptString(DEFAULT_BUNDLE_NAME, locale, messageKey, args);
+  }
+
+  public static String getBodyJavascriptString(Locale locale, String messageKey, Object[] args) {
+    return getBodyJavascriptString(DEFAULT_BUNDLE_NAME, locale, messageKey, args);
+  }
+
+  // More general methods which allow bundlenames and class loaders to be specified.
+  public static String getString(String bundleName, Locale locale, String messageKey, Object[] args) {
+    return getString(Messages.class, bundleName, locale, messageKey, args);
+  }
+
+  public static String getAttributeString(String bundleName, Locale locale, String messageKey, Object[] args) {
+    return getAttributeString(Messages.class, bundleName, locale, messageKey, args);
+  }
+
+  public static String getBodyString(String bundleName, Locale locale, String messageKey, Object[] args) {
+    return getBodyString(Messages.class, bundleName, locale, messageKey, args);
+  }
+
+  public static String getAttributeJavascriptString(String bundleName, Locale locale, String messageKey, Object[] args) {
+    return getAttributeJavascriptString(Messages.class, bundleName, locale, messageKey, args);
+  }
+
+  public static String getBodyJavascriptString(String bundleName, Locale locale, String messageKey, Object[] args) {
+    return getBodyJavascriptString(Messages.class, bundleName, locale, messageKey, args);
+  }
+
+  // Resource output
+  public static void outputResource(IHTTPOutput output, Locale locale, String resourceKey,
+          Map<String, String> substitutionParameters, boolean mapToUpperCase)
+          throws ManifoldCFException {
+    outputResource(output, Messages.class, DEFAULT_PATH_NAME, locale, resourceKey,
+            substitutionParameters, mapToUpperCase);
+  }
+
+  public static void outputResourceWithVelocity(IHTTPOutput output, Locale locale, String resourceKey,
+          Map<String, String> substitutionParameters, boolean mapToUpperCase)
+          throws ManifoldCFException {
+    outputResourceWithVelocity(output, Messages.class, DEFAULT_BUNDLE_NAME, DEFAULT_PATH_NAME, locale, resourceKey,
+            substitutionParameters, mapToUpperCase);
+  }
+
+  public static void outputResourceWithVelocity(IHTTPOutput output, Locale locale, String resourceKey,
+          Map<String, Object> contextObjects)
+          throws ManifoldCFException {
+    outputResourceWithVelocity(output, Messages.class, DEFAULT_BUNDLE_NAME, DEFAULT_PATH_NAME, locale, resourceKey,
+            contextObjects);
+  }
+
+}

Added: manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/native2ascii/org/apache/manifoldcf/agents/output/kafka/common_en_US.properties
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/native2ascii/org/apache/manifoldcf/agents/output/kafka/common_en_US.properties?rev=1708553&view=auto
==============================================================================
--- manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/native2ascii/org/apache/manifoldcf/agents/output/kafka/common_en_US.properties (added)
+++ manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/native2ascii/org/apache/manifoldcf/agents/output/kafka/common_en_US.properties Wed Oct 14 07:18:37 2015
@@ -0,0 +1,27 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Add stuff as needed
+KafkaConnector.Parameters=Parameters
+
+KafkaConnector.IPColon=IP:
+KafkaConnector.PortColon=Port:
+KafkaConnector.TopicColon=Topic:
+
+KafkaConnector.PleaseSupplyValidIP=Please supply a valid Kafka IP
+KafkaConnector.PleaseSupplyValidPort=Please supply a valid port
+KafkaConnector.PleaseSupplyValidTopic=Please supply a valid topic name
+
+KafkaConnector.Kafka=Kafka

Added: manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/native2ascii/org/apache/manifoldcf/agents/output/kafka/common_ja_JP.properties
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/native2ascii/org/apache/manifoldcf/agents/output/kafka/common_ja_JP.properties?rev=1708553&view=auto
==============================================================================
--- manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/native2ascii/org/apache/manifoldcf/agents/output/kafka/common_ja_JP.properties (added)
+++ manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/native2ascii/org/apache/manifoldcf/agents/output/kafka/common_ja_JP.properties Wed Oct 14 07:18:37 2015
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Add stuff as needed

Added: manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/native2ascii/org/apache/manifoldcf/agents/output/kafka/common_zh_CN.properties
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/native2ascii/org/apache/manifoldcf/agents/output/kafka/common_zh_CN.properties?rev=1708553&view=auto
==============================================================================
--- manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/native2ascii/org/apache/manifoldcf/agents/output/kafka/common_zh_CN.properties (added)
+++ manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/native2ascii/org/apache/manifoldcf/agents/output/kafka/common_zh_CN.properties Wed Oct 14 07:18:37 2015
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Add stuff as needed

Added: manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/resources/org/apache/manifoldcf/agents/output/kafka/editConfiguration.js
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/resources/org/apache/manifoldcf/agents/output/kafka/editConfiguration.js?rev=1708553&view=auto
==============================================================================
--- manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/resources/org/apache/manifoldcf/agents/output/kafka/editConfiguration.js (added)
+++ manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/resources/org/apache/manifoldcf/agents/output/kafka/editConfiguration.js Wed Oct 14 07:18:37 2015
@@ -0,0 +1,73 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<script type="text/javascript">
+<!--
+function checkConfig() {
+  if (editconnection.ip) {
+    if (editconnection.ip.value == "") {
+      alert("$Encoder.bodyJavascriptEscape($ResourceBundle.getString('KafkaConnector.PleaseSupplyValidKafkaLocation'))");
+      editconnection.ip.focus();
+      return false;
+    }
+  }
+  if (editconnection.port) {
+    if (editconnection.port.value == "") {
+      alert("$Encoder.bodyJavascriptEscape($ResourceBundle.getString('KafkaConnector.PleaseSupplyValidPort'))");
+      editconnection.port.focus();
+      return false;
+    }
+  }
+  if (editconnection.topic) {
+    if (editconnection.topic.value == "") {
+      alert("$Encoder.bodyJavascriptEscape($ResourceBundle.getString('KafkaConnector.PleaseSupplyValidTopic'))");
+      editconnection.topic.focus();
+      return false;
+    }
+  }
+  return true;
+}
+
+function checkConfigForSave() {
+  if (editconnection.ip) {
+    if (editconnection.ip.value == "") {
+      alert("$Encoder.bodyJavascriptEscape($ResourceBundle.getString('KafkaConnector.PleaseSupplyValidIP'))");
+      SelectTab("$Encoder.javascriptBodyEscape($ResourceBundle.getString('KafkaConnector.Parameters'))");
+      editconnection.ip.focus();
+      return false;
+    }
+  }
+  if (editconnection.port) {
+    if (editconnection.port.value == "") {
+      alert("$Encoder.bodyJavascriptEscape($ResourceBundle.getString('KafkaConnector.PleaseSupplyValidPort'))");
+      SelectTab("$Encoder.javascriptBodyEscape($ResourceBundle.getString('KafkaConnector.Parameters'))");
+      editconnection.port.focus();
+      return false;
+    }
+  }
+  if (editconnection.topic) {
+    if (editconnection.topic.value == "") {
+      alert("$Encoder.bodyJavascriptEscape($ResourceBundle.getString('KafkaConnector.PleaseSupplyValidTopic'))");
+      SelectTab("$Encoder.javascriptBodyEscape($ResourceBundle.getString('KafkaConnector.Parameters'))");
+      editconnection.topic.focus();
+      return false;
+    }
+  }
+  return true;
+}
+//-->
+</script>

Added: manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/resources/org/apache/manifoldcf/agents/output/kafka/editConfiguration_Parameters.html
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/resources/org/apache/manifoldcf/agents/output/kafka/editConfiguration_Parameters.html?rev=1708553&view=auto
==============================================================================
--- manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/resources/org/apache/manifoldcf/agents/output/kafka/editConfiguration_Parameters.html (added)
+++ manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/resources/org/apache/manifoldcf/agents/output/kafka/editConfiguration_Parameters.html Wed Oct 14 07:18:37 2015
@@ -0,0 +1,48 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+#if($TABNAME == $ResourceBundle.getString('KafkaConnector.Parameters'))
+
+<table class="displaytable">
+    <tr>
+        <td class="description">
+    <nobr>$Encoder.bodyEscape($ResourceBundle.getString('KafkaConnector.IPColon'))</nobr>
+
+</td>
+<td class="value"><input name="ip" type="text"
+                         value="$Encoder.attributeEscape($IP)" size="48" />
+</td>
+</tr>
+<tr>
+    <td class="description"><nobr>$Encoder.bodyEscape($ResourceBundle.getString('KafkaConnector.PortColon'))</nobr></td>
+<td class="value"><input name="port" type="text" value="$Encoder.attributeEscape($PORT)"
+                         size="24" /></td>
+</tr>
+<tr>
+    <td class="description"><nobr>$Encoder.bodyEscape($ResourceBundle.getString('KafkaConnector.TopicColon'))</nobr></td>
+<td class="value"><input name="topic" type="text" value="$Encoder.attributeEscape($TOPIC)"
+                         size="24" /></td>
+</tr>
+</table>
+
+#else
+
+<input type="hidden" name="ip" value="$Encoder.attributeEscape($IP)" />
+<input type="hidden" name="port" value="$Encoder.attributeEscape($PORT)" />
+<input type="hidden" name="topic" value="$Encoder.attributeEscape($TOPIC)" />
+
+#end

Added: manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/resources/org/apache/manifoldcf/agents/output/kafka/viewConfiguration.html
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/resources/org/apache/manifoldcf/agents/output/kafka/viewConfiguration.html?rev=1708553&view=auto
==============================================================================
--- manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/resources/org/apache/manifoldcf/agents/output/kafka/viewConfiguration.html (added)
+++ manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/main/resources/org/apache/manifoldcf/agents/output/kafka/viewConfiguration.html Wed Oct 14 07:18:37 2015
@@ -0,0 +1,32 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<table class="displaytable">
+    <tr>
+        <td class="description"><nobr>$Encoder.bodyEscape($ResourceBundle.getString('KafkaConnector.IPColon'))</nobr>
+
+    <td class="value">$Encoder.bodyEscape($IP)</td>
+</tr>
+<tr>
+    <td class="description"><nobr>$Encoder.bodyEscape($ResourceBundle.getString('KafkaConnector.PortColon'))</nobr></td>
+<td class="value">$Encoder.bodyEscape($PORT)</td>
+</tr>
+<tr>
+    <td class="description"><nobr>$Encoder.bodyEscape($ResourceBundle.getString('KafkaConnector.TopicColon'))</nobr></td>
+<td class="value">$Encoder.bodyEscape($TOPIC)</td>
+</tr>
+</table>
\ No newline at end of file

Added: manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/APISanityHSQLDBIT.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/APISanityHSQLDBIT.java?rev=1708553&view=auto
==============================================================================
--- manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/APISanityHSQLDBIT.java (added)
+++ manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/APISanityHSQLDBIT.java Wed Oct 14 07:18:37 2015
@@ -0,0 +1,318 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.manifoldcf.agents.output.kafka;
+
+import org.apache.manifoldcf.core.interfaces.Configuration;
+import org.apache.manifoldcf.core.interfaces.ConfigurationNode;
+import org.apache.manifoldcf.core.interfaces.ManifoldCFException;
+import org.apache.manifoldcf.crawler.system.ManifoldCF;
+import org.junit.Test;
+
+/**
+ * @author tugba
+ */
+public class APISanityHSQLDBIT extends BaseITHSQLDB {
+
+  @Test
+  public void sanityCheck()
+          throws Exception {
+    try {
+      int i;
+
+      // Create a basic file system connection, and save it.
+      ConfigurationNode connectionObject;
+      ConfigurationNode child;
+      Configuration requestObject;
+      Configuration result;
+      connectionObject = new ConfigurationNode("repositoryconnection");
+
+      child = new ConfigurationNode("name");
+      child.setValue("Test Connection");
+      connectionObject.addChild(connectionObject.getChildCount(), child);
+
+      child = new ConfigurationNode("class_name");
+      child.setValue("org.apache.manifoldcf.crawler.tests.TestingRepositoryConnector");
+      connectionObject.addChild(connectionObject.getChildCount(), child);
+
+      child = new ConfigurationNode("description");
+      child.setValue("Test Connection");
+      connectionObject.addChild(connectionObject.getChildCount(), child);
+
+      child = new ConfigurationNode("max_connections");
+      child.setValue("10");
+      connectionObject.addChild(connectionObject.getChildCount(), child);
+
+      child = new ConfigurationNode("configuration");
+
+      //Testing Repository Connector parameters
+      // MHL
+      /*
+       ConfigurationNode cmisBindingNode = new ConfigurationNode("_PARAMETER_");
+       cmisBindingNode.setAttribute("name", CmisConfig.BINDING_PARAM);
+       cmisBindingNode.setValue(CmisConfig.BINDING_DEFAULT_VALUE);
+       child.addChild(child.getChildCount(), cmisBindingNode);
+       */
+      connectionObject.addChild(connectionObject.getChildCount(), child);
+
+      requestObject = new Configuration();
+      requestObject.addChild(0, connectionObject);
+
+      result = performAPIPutOperationViaNodes("repositoryconnections/Test%20Connection", 201, requestObject);
+
+      i = 0;
+      while (i < result.getChildCount()) {
+        ConfigurationNode resultNode = result.findChild(i++);
+        if (resultNode.getType().equals("error")) {
+          throw new Exception(resultNode.getValue());
+        }
+      }
+
+      // Create a basic null output connection, and save it.
+      connectionObject = new ConfigurationNode("outputconnection");
+
+      child = new ConfigurationNode("name");
+      child.setValue("Kafka");
+      connectionObject.addChild(connectionObject.getChildCount(), child);
+
+      child = new ConfigurationNode("class_name");
+      child.setValue("org.apache.manifoldcf.agents.output.kafka.KafkaOutputConnector");
+      connectionObject.addChild(connectionObject.getChildCount(), child);
+
+      child = new ConfigurationNode("description");
+      child.setValue("Kafka Connection");
+      connectionObject.addChild(connectionObject.getChildCount(), child);
+
+      child = new ConfigurationNode("max_connections");
+      child.setValue("100");
+      connectionObject.addChild(connectionObject.getChildCount(), child);
+
+      child = new ConfigurationNode("configuration");
+
+      //Kafka Output Connector parameters
+      //IP
+      ConfigurationNode ip = new ConfigurationNode("_PARAMETER_");
+      ip.setAttribute("name", "ip");
+      ip.setValue("localhost");
+      child.addChild(child.getChildCount(), ip);
+      //port
+      ConfigurationNode port = new ConfigurationNode("_PARAMETER_");
+      port.setAttribute("name", "port");
+      port.setValue("9092");
+      child.addChild(child.getChildCount(), port);
+      //topic
+      ConfigurationNode topic = new ConfigurationNode("_PARAMETER_");
+      topic.setAttribute("name", "topic");
+      topic.setValue("topic");
+      child.addChild(child.getChildCount(), topic);
+      connectionObject.addChild(connectionObject.getChildCount(), child);
+      requestObject = new Configuration();
+      requestObject.addChild(0, connectionObject);
+      result = performAPIPutOperationViaNodes("outputconnections/Kafka", 201, requestObject);
+      i = 0;
+      while (i < result.getChildCount()) {
+        ConfigurationNode resultNode = result.findChild(i++);
+        if (resultNode.getType().equals("error")) {
+          throw new Exception(resultNode.getValue());
+        }
+      }
+      // Create a job.
+      ConfigurationNode jobObject = new ConfigurationNode("job");
+      child = new ConfigurationNode("description");
+      child.setValue("Test Job");
+      jobObject.addChild(jobObject.getChildCount(), child);
+
+      child = new ConfigurationNode("repository_connection");
+      child.setValue("Test Connection");
+      jobObject.addChild(jobObject.getChildCount(), child);
+
+      // Revamped way of adding output connection
+      child = new ConfigurationNode("pipelinestage");
+      ConfigurationNode pipelineChild = new ConfigurationNode("stage_id");
+      pipelineChild.setValue("0");
+      child.addChild(child.getChildCount(), pipelineChild);
+      pipelineChild = new ConfigurationNode("stage_isoutput");
+      pipelineChild.setValue("true");
+      child.addChild(child.getChildCount(), pipelineChild);
+      pipelineChild = new ConfigurationNode("stage_connectionname");
+      pipelineChild.setValue("Kafka");
+      child.addChild(child.getChildCount(), pipelineChild);
+      jobObject.addChild(jobObject.getChildCount(), child);
+      child = new ConfigurationNode("run_mode");
+      child.setValue("scan once");
+      jobObject.addChild(jobObject.getChildCount(), child);
+      child = new ConfigurationNode("start_mode");
+      child.setValue("manual");
+      jobObject.addChild(jobObject.getChildCount(), child);
+      child = new ConfigurationNode("hopcount_mode");
+      child.setValue("accurate");
+      jobObject.addChild(jobObject.getChildCount(), child);
+
+      child = new ConfigurationNode("document_specification");
+
+      jobObject.addChild(jobObject.getChildCount(), child);
+      requestObject = new Configuration();
+      requestObject.addChild(0, jobObject);
+      result = performAPIPostOperationViaNodes("jobs", 201, requestObject);
+      String jobIDString = null;
+      i = 0;
+      while (i < result.getChildCount()) {
+        ConfigurationNode resultNode = result.findChild(i++);
+        if (resultNode.getType().equals("error")) {
+          throw new Exception(resultNode.getValue());
+        } else if (resultNode.getType().equals("job_id")) {
+          jobIDString = resultNode.getValue();
+        }
+      }
+      if (jobIDString == null) {
+        throw new Exception("Missing job_id from return!");
+      }
+      // Now, start the job, and wait until it completes.
+      startJob(jobIDString);
+      waitJobInactive(jobIDString, 120000L);
+
+      // Check to be sure we actually processed the right number of documents.
+      // The test data area has 3 documents and one directory, and we have to count the root directory too.
+      long count;
+      count = getJobDocumentsProcessed(jobIDString);
+      if (count != 3) {
+        throw new ManifoldCFException("Wrong number of documents processed - expected 3, saw " + new Long(count).toString());
+      }
+
+      // Now, delete the job.
+      deleteJob(jobIDString);
+
+      waitJobDeleted(jobIDString, 120000L);
+
+      // Cleanup is automatic by the base class, so we can feel free to leave jobs and connections lying around.
+    } catch (Exception e) {
+      e.printStackTrace();
+      throw e;
+    }
+  }
+
+  protected void startJob(String jobIDString)
+          throws Exception {
+    Configuration requestObject = new Configuration();
+
+    Configuration result = performAPIPutOperationViaNodes("start/" + jobIDString, 201, requestObject);
+    int i = 0;
+    while (i < result.getChildCount()) {
+      ConfigurationNode resultNode = result.findChild(i++);
+      if (resultNode.getType().equals("error")) {
+        throw new Exception(resultNode.getValue());
+      }
+    }
+  }
+
+  protected void deleteJob(String jobIDString)
+          throws Exception {
+    Configuration result = performAPIDeleteOperationViaNodes("jobs/" + jobIDString, 200);
+    int i = 0;
+    while (i < result.getChildCount()) {
+      ConfigurationNode resultNode = result.findChild(i++);
+      if (resultNode.getType().equals("error")) {
+        throw new Exception(resultNode.getValue());
+      }
+    }
+
+  }
+
+  protected String getJobStatus(String jobIDString)
+          throws Exception {
+    Configuration result = performAPIGetOperationViaNodes("jobstatuses/" + jobIDString, 200);
+    String status = null;
+    int i = 0;
+    while (i < result.getChildCount()) {
+      ConfigurationNode resultNode = result.findChild(i++);
+      if (resultNode.getType().equals("error")) {
+        throw new Exception(resultNode.getValue());
+      } else if (resultNode.getType().equals("jobstatus")) {
+        int j = 0;
+        while (j < resultNode.getChildCount()) {
+          ConfigurationNode childNode = resultNode.findChild(j++);
+          if (childNode.getType().equals("status")) {
+            status = childNode.getValue();
+          }
+        }
+      }
+    }
+    return status;
+  }
+
+  protected long getJobDocumentsProcessed(String jobIDString)
+          throws Exception {
+    Configuration result = performAPIGetOperationViaNodes("jobstatuses/" + jobIDString, 200);
+    String documentsProcessed = null;
+    int i = 0;
+    while (i < result.getChildCount()) {
+      ConfigurationNode resultNode = result.findChild(i++);
+      if (resultNode.getType().equals("error")) {
+        throw new Exception(resultNode.getValue());
+      } else if (resultNode.getType().equals("jobstatus")) {
+        int j = 0;
+        while (j < resultNode.getChildCount()) {
+          ConfigurationNode childNode = resultNode.findChild(j++);
+          if (childNode.getType().equals("documents_processed")) {
+            documentsProcessed = childNode.getValue();
+          }
+        }
+      }
+    }
+    if (documentsProcessed == null) {
+      throw new Exception("Expected a documents_processed field, didn't find it");
+    }
+    return new Long(documentsProcessed).longValue();
+  }
+
+  protected void waitJobInactive(String jobIDString, long maxTime)
+          throws Exception {
+    long startTime = System.currentTimeMillis();
+    while (System.currentTimeMillis() < startTime + maxTime) {
+      String status = getJobStatus(jobIDString);
+      if (status == null) {
+        throw new Exception("No such job: '" + jobIDString + "'");
+      }
+      if (status.equals("not yet run")) {
+        throw new Exception("Job was never started.");
+      }
+      if (status.equals("done")) {
+        return;
+      }
+      if (status.equals("error")) {
+        throw new Exception("Job reports error.");
+      }
+      ManifoldCF.sleep(1000L);
+      continue;
+    }
+    throw new ManifoldCFException("ManifoldCF did not terminate in the allotted time of " + new Long(maxTime).toString() + " milliseconds");
+  }
+
+  protected void waitJobDeleted(String jobIDString, long maxTime)
+          throws Exception {
+    long startTime = System.currentTimeMillis();
+    while (System.currentTimeMillis() < startTime + maxTime) {
+      String status = getJobStatus(jobIDString);
+      if (status == null) {
+        return;
+      }
+      ManifoldCF.sleep(1000L);
+    }
+    throw new ManifoldCFException("ManifoldCF did not delete in the allotted time of " + new Long(maxTime).toString() + " milliseconds");
+  }
+
+}

Added: manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/BaseHSQLDB.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/BaseHSQLDB.java?rev=1708553&view=auto
==============================================================================
--- manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/BaseHSQLDB.java (added)
+++ manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/BaseHSQLDB.java Wed Oct 14 07:18:37 2015
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.manifoldcf.agents.output.kafka;
+
+/**
+ * This is a testing base class that is responsible for setting up/tearing down
+ * the agents framework.
+ */
+public class BaseHSQLDB extends org.apache.manifoldcf.crawler.tests.ConnectorBaseHSQLDB {
+
+  protected String[] getOutputNames() {
+    return new String[]{"Kafka"};
+  }
+
+  protected String[] getOutputClasses() {
+    return new String[]{"org.apache.manifoldcf.agents.output.kafka.KafkaOutputConnector"};
+  }
+
+}

Added: manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/BaseITHSQLDB.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/BaseITHSQLDB.java?rev=1708553&view=auto
==============================================================================
--- manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/BaseITHSQLDB.java (added)
+++ manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/BaseITHSQLDB.java Wed Oct 14 07:18:37 2015
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.manifoldcf.agents.output.kafka;
+
+import java.util.Properties;
+import org.junit.After;
+import static org.junit.Assert.fail;
+import org.junit.Before;
+
+public class BaseITHSQLDB extends org.apache.manifoldcf.crawler.tests.BaseITHSQLDB {
+
+  static KafkaLocal kafka;
+
+  protected String[] getConnectorNames() {
+    return new String[]{"CMIS"};
+  }
+
+  protected String[] getConnectorClasses() {
+    return new String[]{"org.apache.manifoldcf.crawler.tests.TestingRepositoryConnector"};
+  }
+
+  protected String[] getOutputNames() {
+    return new String[]{"Kafka"};
+  }
+
+  protected String[] getOutputClasses() {
+    return new String[]{"org.apache.manifoldcf.agents.output.kafka.KafkaOutputConnector"};
+  }
+
+  @Before
+  public void setupKafka()
+          throws Exception {
+    Properties kafkaProperties = new Properties();
+    Properties zkProperties = new Properties();
+
+    try {
+      //load properties
+      kafkaProperties.put("broker.id", "0");
+      kafkaProperties.put("port", "9092");
+      kafkaProperties.put("num.network.threads", "3");
+      kafkaProperties.put("num.io.threads", "8");
+      kafkaProperties.put("socket.send.buffer.bytes", "102400");
+      kafkaProperties.put("socket.receive.buffer.bytes", "102400");
+      kafkaProperties.put("socket.request.max.bytes", "104857600");
+      kafkaProperties.put("log.dirs", "/tmp/kafka-logs");
+      kafkaProperties.put("num.partitions", "1");
+      kafkaProperties.put("num.recovery.threads.per.data.dir", "1");
+      kafkaProperties.put("log.retention.hours", "168");
+      kafkaProperties.put("log.segment.bytes", "1073741824");
+      kafkaProperties.put("log.retention.check.interval.ms", "300000");
+      kafkaProperties.put("log.cleaner.enable", "false");
+      kafkaProperties.put("zookeeper.connect", "localhost:2181");
+      kafkaProperties.put("zookeeper.connection.timeout.ms", "6000");
+
+      zkProperties.put("dataDir", "/tmp/zookeeper");
+      zkProperties.put("clientPort", "2181");
+      zkProperties.put("maxClientCnxns", "0");
+
+      //kafkaProperties.load(Class.class.getResourceAsStream("/kafkalocal.properties"));
+      //zkProperties.load(Class.class.getResourceAsStream("/zklocal.properties"));
+      System.out.println("Kafka is starting...");
+
+      //start kafka
+      kafka = new KafkaLocal(kafkaProperties, zkProperties);
+      Thread.sleep(5000);
+    } catch (Exception e) {
+      e.printStackTrace(System.out);
+      fail("Error running local Kafka broker");
+      e.printStackTrace(System.out);
+    }
+  }
+
+  @After
+  public void cleanUpKafka() {
+    kafka.stop();
+  }
+
+}

Added: manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/BasePostgresql.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/BasePostgresql.java?rev=1708553&view=auto
==============================================================================
--- manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/BasePostgresql.java (added)
+++ manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/BasePostgresql.java Wed Oct 14 07:18:37 2015
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.manifoldcf.agents.output.kafka;
+
+public class BasePostgresql extends org.apache.manifoldcf.crawler.tests.ConnectorBasePostgresql {
+
+  protected String[] getOutputNames() {
+    return new String[]{"Kafka"};
+  }
+
+  protected String[] getOutputClasses() {
+    return new String[]{"org.apache.manifoldcf.agents.output.kafka.KafkaOutputConnector"};
+  }
+
+}

Added: manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/BaseUIHSQLDB.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/BaseUIHSQLDB.java?rev=1708553&view=auto
==============================================================================
--- manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/BaseUIHSQLDB.java (added)
+++ manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/BaseUIHSQLDB.java Wed Oct 14 07:18:37 2015
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.manifoldcf.agents.output.kafka;
+
+public class BaseUIHSQLDB extends org.apache.manifoldcf.crawler.tests.ConnectorBaseUIHSQLDB {
+
+  protected String[] getConnectorNames() {
+    return new String[]{"Test Connector"};
+  }
+
+  protected String[] getConnectorClasses() {
+    return new String[]{"org.apache.manifoldcf.crawler.tests.TestingRepositoryConnector"};
+  }
+
+  protected String[] getOutputNames() {
+    return new String[]{"Kafka"};
+  }
+
+  protected String[] getOutputClasses() {
+    return new String[]{"org.apache.manifoldcf.agents.output.kafka.KafkaOutputConnector"};
+  }
+
+}

Added: manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/KafkaConnectorTest.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/KafkaConnectorTest.java?rev=1708553&view=auto
==============================================================================
--- manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/KafkaConnectorTest.java (added)
+++ manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/KafkaConnectorTest.java Wed Oct 14 07:18:37 2015
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.manifoldcf.agents.output.kafka;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.manifoldcf.agents.interfaces.IOutputAddActivity;
+import org.apache.commons.lang3.concurrent.ConcurrentUtils;
+
+import org.apache.manifoldcf.agents.interfaces.RepositoryDocument;
+import org.apache.manifoldcf.core.interfaces.VersionContext;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class KafkaConnectorTest {
+
+  @Mock
+  private KafkaProducer producer;
+
+  private KafkaOutputConnector connector;
+
+  @Before
+  public void setup() throws Exception {
+    connector = new KafkaOutputConnector();
+    connector.setProducer(producer);
+
+    when(producer.send(Mockito.any(ProducerRecord.class))).thenReturn(ConcurrentUtils.constantFuture(true));
+  }
+
+  @Test
+  public void whenSendingDocumenttoKafka() throws Exception {
+    RepositoryDocument document;
+
+    document = new RepositoryDocument();
+
+    document.setMimeType("text\'/plain");
+    document.setFileName("test.txt");
+
+    KafkaMessage kafkaMessage = new KafkaMessage();
+    byte[] finalString = kafkaMessage.createJSON(document);
+
+    IOutputAddActivity activities = mock(IOutputAddActivity.class);
+    VersionContext version = mock(VersionContext.class);
+    //ProducerRecord record = new ProducerRecord("topic", finalString);
+
+    connector.addOrReplaceDocumentWithException("", version, document, "", activities);
+    verify(producer).send(Mockito.any(ProducerRecord.class));
+  }
+}

Added: manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/KafkaLocal.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/KafkaLocal.java?rev=1708553&view=auto
==============================================================================
--- manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/KafkaLocal.java (added)
+++ manifoldcf/branches/CONNECTORS-1162/connectors/kafka/connector/src/test/java/org/apache/manifoldcf/agents/output/kafka/KafkaLocal.java Wed Oct 14 07:18:37 2015
@@ -0,0 +1,36 @@
+package org.apache.manifoldcf.agents.output.kafka;
+
+import java.io.IOException;
+import java.util.Properties;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServerStartable;
+
+public class KafkaLocal {
+
+  public KafkaServerStartable kafka;
+  public ZooKeeperLocal zookeeper;
+
+  public KafkaLocal(Properties kafkaProperties, Properties zkProperties) throws IOException, InterruptedException {
+    KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties);
+
+    //start local zookeeper
+    System.out.println("starting local zookeeper...");
+    zookeeper = new ZooKeeperLocal(zkProperties);
+    System.out.println("done");
+
+    //start local kafka broker
+    kafka = new KafkaServerStartable(kafkaConfig);
+    System.out.println("starting local kafka broker...");
+
+    kafka.startup();
+    System.out.println("done");
+  }
+
+  public void stop() {
+    //stop kafka broker
+    System.out.println("stopping kafka...");
+    kafka.shutdown();
+    System.out.println("done");
+  }
+
+}