You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ti...@apache.org on 2022/12/10 16:59:22 UTC
[pulsar] branch master updated: [feat][io] Add Alluxio sink connector (#3823)
This is an automated email from the ASF dual-hosted git repository.
tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b0695fb2192 [feat][io] Add Alluxio sink connector (#3823)
b0695fb2192 is described below
commit b0695fb2192e04b9d8c522a1944a3b8e18755e15
Author: Fangbin Sun <su...@gmail.com>
AuthorDate: Sun Dec 11 00:59:16 2022 +0800
[feat][io] Add Alluxio sink connector (#3823)
---
pulsar-io/alluxio/pom.xml | 118 ++++++++
.../pulsar/io/alluxio/AlluxioAbstractConfig.java | 76 +++++
.../apache/pulsar/io/alluxio/sink/AlluxioSink.java | 337 +++++++++++++++++++++
.../pulsar/io/alluxio/sink/AlluxioSinkConfig.java | 112 +++++++
.../resources/META-INF/services/pulsar-io.yaml | 21 ++
.../io/alluxio/sink/AlluxioSinkConfigTest.java | 168 ++++++++++
.../pulsar/io/alluxio/sink/AlluxioSinkTest.java | 220 ++++++++++++++
.../org/apache/pulsar/io/alluxio/sink/Foobar.java | 32 ++
.../alluxio/src/test/resources/sinkConfig.yaml | 29 ++
pulsar-io/pom.xml | 1 +
site2/docs/io-alluxio.md | 62 ++++
11 files changed, 1176 insertions(+)
diff --git a/pulsar-io/alluxio/pom.xml b/pulsar-io/alluxio/pom.xml
new file mode 100644
index 00000000000..e8a5436d612
--- /dev/null
+++ b/pulsar-io/alluxio/pom.xml
@@ -0,0 +1,118 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>pulsar-io</artifactId>
+ <groupId>org.apache.pulsar</groupId>
+ <version>2.11.0-SNAPSHOT</version>
+ </parent>
+
+ <properties>
+ <alluxio.version>2.5.0</alluxio.version>
+ <metrics.version>4.1.11</metrics.version>
+ <grpc.version>1.37.0</grpc.version>
+ </properties>
+
+ <artifactId>pulsar-io-alluxio</artifactId>
+ <name>Pulsar IO :: Alluxio</name>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-io-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-client-original</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.alluxio</groupId>
+ <artifactId>alluxio-core-client-fs</artifactId>
+ <version>${alluxio.version}</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>grpc-netty</artifactId>
+ <groupId>io.grpc</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.alluxio</groupId>
+ <artifactId>alluxio-minicluster</artifactId>
+ <version>${alluxio.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.glassfish</groupId>
+ <artifactId>javax.el</artifactId>
+ </exclusion>
+ <exclusion>
+ <artifactId>grpc-netty</artifactId>
+ <groupId>io.grpc</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.dataformat</groupId>
+ <artifactId>jackson-dataformat-yaml</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+
+ <!-- alluxio grpc dependency need higher version -->
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-netty</artifactId>
+ <version>${grpc.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>io.dropwizard.metrics</groupId>
+ <artifactId>metrics-jvm</artifactId>
+ <version>${metrics.version}</version>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-nar-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/pulsar-io/alluxio/src/main/java/org/apache/pulsar/io/alluxio/AlluxioAbstractConfig.java b/pulsar-io/alluxio/src/main/java/org/apache/pulsar/io/alluxio/AlluxioAbstractConfig.java
new file mode 100644
index 00000000000..96fe38a5893
--- /dev/null
+++ b/pulsar-io/alluxio/src/main/java/org/apache/pulsar/io/alluxio/AlluxioAbstractConfig.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.alluxio;
+
+import com.google.common.base.Preconditions;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.experimental.Accessors;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
+
+import java.io.Serializable;
+
+/**
+ * Configuration object for all Alluxio Sink components.
+ */
+@Data
+@Setter
+@Getter
+@EqualsAndHashCode
+@ToString
+@Accessors(chain = true)
+public class AlluxioAbstractConfig implements Serializable {
+
+ private static final long serialVersionUID = 3727671407445918309L;
+
+ @FieldDoc(
+ required = true,
+ defaultValue = "",
+ help = "The hostname of Alluxio master")
+ private String alluxioMasterHost;
+
+ @FieldDoc(
+ required = true,
+ defaultValue = "19998",
+ help = "The port that Alluxio master node runs on")
+ private int alluxioMasterPort = 19998;
+
+ @FieldDoc(
+ required = true,
+ defaultValue = "",
+ help = "The Alluxio directory from which files should be read from or written to")
+ private String alluxioDir;
+
+ @FieldDoc(
+ required = false,
+ defaultValue = "",
+ help = "When `alluxio.security.authentication.type` is set to `SIMPLE` or `CUSTOM`, user application uses"
+ + " this property to indicate the user requesting Alluxio service. If it is not set explicitly,"
+ + " the OS login user is used")
+ private String securityLoginUser;
+
+ public void validate() {
+ Preconditions.checkNotNull(alluxioMasterHost, "alluxioMasterHost property not set.");
+ Preconditions.checkNotNull(alluxioMasterPort, "alluxioMasterPort property not set.");
+ Preconditions.checkNotNull(alluxioDir, "alluxioDir property not set.");
+ }
+}
diff --git a/pulsar-io/alluxio/src/main/java/org/apache/pulsar/io/alluxio/sink/AlluxioSink.java b/pulsar-io/alluxio/src/main/java/org/apache/pulsar/io/alluxio/sink/AlluxioSink.java
new file mode 100644
index 00000000000..87cc643a2bc
--- /dev/null
+++ b/pulsar-io/alluxio/src/main/java/org/apache/pulsar/io/alluxio/sink/AlluxioSink.java
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.alluxio.sink;
+
+import alluxio.AlluxioURI;
+import alluxio.client.WriteType;
+import alluxio.client.file.FileOutStream;
+import alluxio.client.file.FileSystem;
+import alluxio.conf.InstancedConfiguration;
+import alluxio.conf.PropertyKey;
+import alluxio.exception.AlluxioException;
+import alluxio.grpc.CreateFilePOptions;
+import alluxio.grpc.WritePType;
+import alluxio.util.FileSystemOptions;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.GenericObject;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.KeyValueSchema;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.KeyValue;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+import org.apache.pulsar.io.core.annotations.Connector;
+import org.apache.pulsar.io.core.annotations.IOType;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * Alluxio sink that treats incoming messages on the input topic as Strings
+ * and write identical key/value pairs.
+ */
+@Connector(
+ name = "alluxio",
+ type = IOType.SINK,
+ help = "The sink connector is used for moving records from Pulsar to Alluxio.",
+ configClass = AlluxioSinkConfig.class)
+@Slf4j
+public class AlluxioSink implements Sink<GenericObject> {
+
+ private FileSystem fileSystem;
+ private FileOutStream fileOutStream;
+ private CreateFilePOptions.Builder optionsBuilder;
+ private long recordsNum;
+ private String tmpFilePath;
+ private String fileDirPath;
+ private String tmpFileDirPath;
+ private long lastRotationTime;
+ private long rotationRecordsNum;
+ private long rotationInterval;
+ private AlluxioSinkConfig alluxioSinkConfig;
+ private AlluxioState alluxioState;
+
+ private InstancedConfiguration configuration = InstancedConfiguration.defaults();
+
+ private ObjectMapper objectMapper = new ObjectMapper();
+
+ private List<Record<GenericObject>> recordsToAck;
+
+ @Override
+ public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
+ alluxioSinkConfig = AlluxioSinkConfig.load(config);
+ alluxioSinkConfig.validate();
+
+ // initialize FileSystem
+ String alluxioMasterHost = alluxioSinkConfig.getAlluxioMasterHost();
+ int alluxioMasterPort = alluxioSinkConfig.getAlluxioMasterPort();
+ InstancedConfiguration.defaults().set(PropertyKey.MASTER_HOSTNAME, alluxioMasterHost);
+ configuration.set(PropertyKey.MASTER_RPC_PORT, alluxioMasterPort);
+ if (alluxioSinkConfig.getSecurityLoginUser() != null) {
+ configuration.set(PropertyKey.SECURITY_LOGIN_USERNAME, alluxioSinkConfig.getSecurityLoginUser());
+ }
+ fileSystem = FileSystem.Factory.create(configuration);
+
+ // initialize alluxio dirs
+ String alluxioDir = alluxioSinkConfig.getAlluxioDir();
+ fileDirPath = alluxioDir.startsWith("/") ? alluxioDir : "/" + alluxioDir;
+ tmpFileDirPath = fileDirPath + "/tmp";
+
+ AlluxioURI alluxioDirPath = new AlluxioURI(fileDirPath);
+ if (!fileSystem.exists(alluxioDirPath)) {
+ fileSystem.createDirectory(alluxioDirPath);
+ }
+
+ AlluxioURI tmpAlluxioDirPath = new AlluxioURI(tmpFileDirPath);
+ if (!fileSystem.exists(tmpAlluxioDirPath)) {
+ fileSystem.createDirectory(tmpAlluxioDirPath);
+ }
+
+ optionsBuilder = FileSystemOptions.createFileDefaults(configuration).toBuilder();
+
+ recordsNum = 0;
+ recordsToAck = Lists.newArrayList();
+ tmpFilePath = "";
+ alluxioState = AlluxioState.WRITE_STARTED;
+
+ lastRotationTime = System.currentTimeMillis();
+ rotationRecordsNum = alluxioSinkConfig.getRotationRecords();
+ rotationInterval = alluxioSinkConfig.getRotationInterval();
+ }
+
+ @Override
+ public void write(Record<GenericObject> record) {
+ long now = System.currentTimeMillis();
+
+ switch (alluxioState) {
+ case WRITE_STARTED:
+ try {
+ writeToAlluxio(record);
+ if (!shouldRotate(now)) {
+ break;
+ }
+ alluxioState = AlluxioState.FILE_ROTATED;
+ } catch (AlluxioException | IOException e) {
+ log.error("Unable to write record to alluxio.", e);
+ record.fail();
+ break;
+ }
+ case FILE_ROTATED:
+ try {
+ closeAndCommitTmpFile();
+ alluxioState = AlluxioState.FILE_COMMITTED;
+ ackRecords();
+ } catch (AlluxioException | IOException e) {
+ log.error("Unable to flush records to alluxio.", e);
+ failRecords();
+ try {
+ deleteTmpFile();
+ } catch (AlluxioException | IOException e1) {
+ log.error("Failed to delete tmp cache file.", e);
+ }
+ break;
+ }
+ case FILE_COMMITTED:
+ alluxioState = AlluxioState.WRITE_STARTED;
+ break;
+ default:
+ log.error("{} is not a valid state when writing record to alluxio temp dir {}.",
+ alluxioState, tmpFileDirPath);
+ break;
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ // flush records in the tmpFile when closing sink
+ try {
+ closeAndCommitTmpFile();
+ ackRecords();
+ } catch (AlluxioException | IOException e) {
+ log.error("Unable to flush records to alluxio.", e);
+ failRecords();
+ }
+ deleteTmpFile();
+ }
+
+ private void ackRecords() {
+ recordsToAck.forEach(Record::ack);
+ recordsToAck.clear();
+ }
+
+ private void failRecords() {
+ recordsToAck.forEach(Record::fail);
+ recordsToAck.clear();
+ }
+
+ private void writeToAlluxio(Record<GenericObject> record) throws AlluxioException, IOException {
+ KeyValue<String, String> keyValue = extractKeyValue(record);
+ if (fileOutStream == null) {
+ createTmpFile();
+ }
+ fileOutStream.write(toBytes(keyValue.getValue()));
+ if (alluxioSinkConfig.getLineSeparator() != '\u0000') {
+ fileOutStream.write(alluxioSinkConfig.getLineSeparator());
+ }
+ recordsNum++;
+ recordsToAck.add(record);
+ }
+
+ private void createTmpFile() throws AlluxioException, IOException {
+ UUID id = UUID.randomUUID();
+ String fileExtension = alluxioSinkConfig.getFileExtension();
+ tmpFilePath = tmpFileDirPath + "/" + id.toString() + "_tmp" + fileExtension;
+ if (alluxioSinkConfig.getWriteType() != null) {
+ WritePType writePType;
+ try {
+ writePType = WritePType.valueOf(alluxioSinkConfig.getWriteType().toUpperCase());
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException("Illegal write type when creating Alluxio files, valid values are: "
+ + Arrays.asList(WriteType.values()));
+ }
+ optionsBuilder.setWriteType(writePType);
+ }
+ fileOutStream = fileSystem.createFile(new AlluxioURI(tmpFilePath), optionsBuilder.build());
+ }
+
+ private void closeAndCommitTmpFile() throws AlluxioException, IOException {
+ // close the tmpFile
+ if (fileOutStream != null) {
+ fileOutStream.close();
+ }
+ // commit the tmpFile
+ String filePrefix = alluxioSinkConfig.getFilePrefix();
+ String fileExtension = alluxioSinkConfig.getFileExtension();
+ String newFile = filePrefix + "-" + System.currentTimeMillis() + fileExtension;
+ String newFilePath = fileDirPath + "/" + newFile;
+ fileSystem.rename(new AlluxioURI(tmpFilePath), new AlluxioURI(newFilePath));
+ fileOutStream = null;
+ tmpFilePath = "";
+ recordsNum = 0;
+ lastRotationTime = System.currentTimeMillis();
+ }
+
+ private void deleteTmpFile() throws AlluxioException, IOException {
+ if (!tmpFilePath.equals("")) {
+ fileSystem.delete(new AlluxioURI(tmpFilePath));
+ }
+ }
+
+ private boolean shouldRotate(long now) {
+ boolean rotated = false;
+ if (recordsNum >= rotationRecordsNum) {
+ rotated = true;
+ } else {
+ if (rotationInterval != -1 && (now - lastRotationTime) >= rotationInterval) {
+ rotated = true;
+ }
+ }
+ return rotated;
+ }
+
+ private static byte[] toByteArray(Object obj) throws IOException {
+ byte[] bytes = null;
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(baos)) {
+ oos.writeObject(obj);
+ oos.flush();
+ bytes = baos.toByteArray();
+ } catch (IOException e) {
+ log.error("Failed to serialize the object.", e);
+ throw e;
+ }
+ return bytes;
+ }
+
+ private static byte[] toBytes(Object obj) throws IOException {
+ byte[] bytes;
+ if (obj instanceof String) {
+ String s = (String) obj;
+ bytes = s.getBytes(StandardCharsets.UTF_8);
+ } else if (obj instanceof byte[]) {
+ bytes = (byte[]) obj;
+ } else {
+ bytes = toByteArray(obj);
+ }
+ return bytes;
+ }
+
+ public KeyValue<String, String> extractKeyValue(Record<GenericObject> record) throws JsonProcessingException {
+ // just ignore the key
+ if (alluxioSinkConfig.isSchemaEnable()) {
+ GenericObject recordValue = null;
+ Schema<?> valueSchema = null;
+ if (record.getSchema() != null && record.getSchema() instanceof KeyValueSchema) {
+ KeyValueSchema<GenericObject, GenericObject> keyValueSchema = (KeyValueSchema) record.getSchema();
+ valueSchema = keyValueSchema.getValueSchema();
+ org.apache.pulsar.common.schema.KeyValue<GenericObject, GenericObject> keyValue =
+ (org.apache.pulsar.common.schema.KeyValue<GenericObject, GenericObject>) record.getValue().getNativeObject();
+ recordValue = keyValue.getValue();
+ } else {
+ valueSchema = record.getSchema();
+ recordValue = record.getValue();
+ }
+
+ String value = null;
+ if (recordValue != null) {
+ if (valueSchema != null) {
+ value = stringifyValue(valueSchema, recordValue);
+ } else {
+ if (recordValue.getNativeObject() instanceof byte[]) {
+ value = new String((byte[]) recordValue.getNativeObject(), StandardCharsets.UTF_8);
+ } else {
+ value = recordValue.getNativeObject().toString();
+ }
+ }
+ }
+ return new KeyValue<>(null, value);
+ } else {
+ return new KeyValue<>(null, new String(record.getMessage()
+ .orElseThrow(() -> new IllegalArgumentException("Record does not carry message information"))
+ .getData(), StandardCharsets.UTF_8));
+ }
+ }
+
+ public String stringifyValue(Schema<?> schema, Object val) throws JsonProcessingException {
+ // just support json schema
+ if (schema.getSchemaInfo().getType() == SchemaType.JSON) {
+ JsonNode jsonNode = (JsonNode) ((GenericRecord) val).getNativeObject();
+ return objectMapper.writeValueAsString(jsonNode);
+ }
+ throw new UnsupportedOperationException("Unsupported value schemaType=" + schema.getSchemaInfo().getType());
+ }
+
+ private enum AlluxioState {
+ WRITE_STARTED,
+ FILE_ROTATED,
+ FILE_COMMITTED
+ }
+}
diff --git a/pulsar-io/alluxio/src/main/java/org/apache/pulsar/io/alluxio/sink/AlluxioSinkConfig.java b/pulsar-io/alluxio/src/main/java/org/apache/pulsar/io/alluxio/sink/AlluxioSinkConfig.java
new file mode 100644
index 00000000000..fde7a402c22
--- /dev/null
+++ b/pulsar-io/alluxio/src/main/java/org/apache/pulsar/io/alluxio/sink/AlluxioSinkConfig.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.alluxio.sink;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import com.google.common.base.Preconditions;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.experimental.Accessors;
+import org.apache.pulsar.io.alluxio.AlluxioAbstractConfig;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+
+@Data
+@Setter
+@Getter
+@EqualsAndHashCode(callSuper = false)
+@ToString
+@Accessors(chain = true)
+public class AlluxioSinkConfig extends AlluxioAbstractConfig implements Serializable {
+
+ private static final long serialVersionUID = -8917657634001769807L;
+
+ @FieldDoc(
+ required = false,
+ defaultValue = "",
+ help = "The prefix of the files to create in the Alluxio directory (e.g. a value of 'TopicA' results"
+ + " in files named topicA-, topicA-, etc being produced)")
+ private String filePrefix;
+
+ @FieldDoc(
+ required = false,
+ defaultValue = "",
+ help = "The extension to add to the files written to Alluxio (e.g. '.txt')")
+ private String fileExtension;
+
+ @FieldDoc(
+ required = false,
+ defaultValue = "",
+ help = "The character used to separate records in a text file. If no value is provided then the content"
+ + " from all of the records is concatenated together in one continuous byte array")
+ private char lineSeparator;
+
+ @FieldDoc(
+ required = false,
+ defaultValue = "10000L",
+ help = "The number records of Alluxio file rotation")
+ private long rotationRecords = 10000L;
+
+ @FieldDoc(
+ required = false,
+ defaultValue = "-1L",
+ help = "The interval to rotate a Alluxio file (in milliseconds)")
+ private long rotationInterval = -1L;
+
+ @FieldDoc(
+ required = false,
+ defaultValue = "MUST_CACHE",
+ help = "Default write type when creating Alluxio files. Valid options are `MUST_CACHE` (write only goes to"
+ + " Alluxio and must be stored in Alluxio), `CACHE_THROUGH` (try to cache, write to UnderFS synchronously),"
+ + " `THROUGH` (no cache, write to UnderFS synchronously)")
+ private String writeType = "MUST_CACHE";
+
+ @FieldDoc(
+ required = false,
+ defaultValue = "false",
+ help = "Sets whether the Sink has to take into account the Schema or if it should simply copy the raw message to Alluxio"
+ )
+ private boolean schemaEnable = false;
+
+ public static AlluxioSinkConfig load(String yamlFile) throws IOException {
+ ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
+ return mapper.readValue(new File(yamlFile), AlluxioSinkConfig.class);
+ }
+
+ public static AlluxioSinkConfig load(Map<String, Object> map) throws IOException {
+ ObjectMapper mapper = new ObjectMapper();
+ return mapper.readValue(new ObjectMapper().writeValueAsString(map), AlluxioSinkConfig.class);
+ }
+
+ @Override
+ public void validate() {
+ super.validate();
+ Preconditions.checkArgument(rotationRecords > 0, "rotationRecords must be a positive long.");
+ Preconditions.checkArgument(rotationInterval == -1 || rotationInterval > 0,
+ "rotationInterval must be either -1 or a positive long.");
+ }
+}
diff --git a/pulsar-io/alluxio/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/alluxio/src/main/resources/META-INF/services/pulsar-io.yaml
new file mode 100644
index 00000000000..02314241f28
--- /dev/null
+++ b/pulsar-io/alluxio/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -0,0 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+name: alluxio
+description: Writes data into Alluxio
+sinkClass: org.apache.pulsar.io.alluxio.sink.AlluxioSink
\ No newline at end of file
diff --git a/pulsar-io/alluxio/src/test/java/org/apache/pulsar/io/alluxio/sink/AlluxioSinkConfigTest.java b/pulsar-io/alluxio/src/test/java/org/apache/pulsar/io/alluxio/sink/AlluxioSinkConfigTest.java
new file mode 100644
index 00000000000..9454393328c
--- /dev/null
+++ b/pulsar-io/alluxio/src/test/java/org/apache/pulsar/io/alluxio/sink/AlluxioSinkConfigTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.alluxio.sink;
+
+import alluxio.client.WriteType;
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+/**
+ * AlluxioSinkConfig test
+ */
+public class AlluxioSinkConfigTest {
+
+ @Test
+ public final void loadFromYamlFileTest() throws IOException {
+ File yamlFile = getFile("sinkConfig.yaml");
+ String path = yamlFile.getAbsolutePath();
+ AlluxioSinkConfig config = AlluxioSinkConfig.load(path);
+ assertNotNull(config);
+ assertEquals("localhost", config.getAlluxioMasterHost());
+ assertEquals(Integer.parseInt("19998"), config.getAlluxioMasterPort());
+ assertEquals("pulsar", config.getAlluxioDir());
+ assertEquals("TopicA", config.getFilePrefix());
+ assertEquals(".txt", config.getFileExtension());
+ assertEquals("\n".charAt(0), config.getLineSeparator());
+ assertEquals(Long.parseLong("100"), config.getRotationRecords());
+ assertEquals(Long.parseLong("-1"), config.getRotationInterval());
+ }
+
+ @Test
+ public final void loadFromMapTest() throws IOException {
+ Map<String, Object> map = new HashMap<>();
+ map.put("alluxioMasterHost", "localhost");
+ map.put("alluxioMasterPort", "19998");
+ map.put("alluxioDir", "pulsar");
+ map.put("filePrefix", "TopicA");
+ map.put("fileExtension", ".txt");
+ map.put("lineSeparator", "\n");
+ map.put("rotationRecords", "100");
+ map.put("rotationInterval", "-1");
+
+ AlluxioSinkConfig config = AlluxioSinkConfig.load(map);
+ assertNotNull(config);
+ assertEquals("localhost", config.getAlluxioMasterHost());
+ assertEquals(Integer.parseInt("19998"), config.getAlluxioMasterPort());
+ assertEquals("pulsar", config.getAlluxioDir());
+ assertEquals("TopicA", config.getFilePrefix());
+ assertEquals(".txt", config.getFileExtension());
+ assertEquals("\n".charAt(0), config.getLineSeparator());
+ assertEquals(Long.parseLong("100"), config.getRotationRecords());
+ assertEquals(Long.parseLong("-1"), config.getRotationInterval());
+ }
+
+ @Test
+ public final void validateTest() throws IOException {
+ Map<String, Object> map = new HashMap<>();
+ map.put("alluxioMasterHost", "localhost");
+ map.put("alluxioMasterPort", "19998");
+ map.put("alluxioDir", "pulsar");
+ map.put("filePrefix", "TopicA");
+ map.put("fileExtension", ".txt");
+ map.put("lineSeparator", "\n");
+ map.put("rotationRecords", "100");
+ map.put("rotationInterval", "-1");
+
+ AlluxioSinkConfig config = AlluxioSinkConfig.load(map);
+ config.validate();
+ }
+
+ @Test(expectedExceptions = NullPointerException.class,
+ expectedExceptionsMessageRegExp = "alluxioDir property not set.")
+ public final void missingValidateAlluxioDirTest() throws IOException {
+ Map<String, Object> map = new HashMap<>();
+ map.put("alluxioMasterHost", "localhost");
+ map.put("alluxioMasterPort", "19998");
+ map.put("filePrefix", "TopicA");
+ map.put("fileExtension", ".txt");
+ map.put("lineSeparator", "\n");
+ map.put("rotationRecords", "100");
+ map.put("rotationInterval", "-1");
+
+ AlluxioSinkConfig config = AlluxioSinkConfig.load(map);
+ config.validate();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class,
+ expectedExceptionsMessageRegExp = "rotationRecords must be a positive long.")
+ public final void invalidRotationRecordsTest() throws IOException {
+ Map<String, Object> map = new HashMap<>();
+ map.put("alluxioMasterHost", "localhost");
+ map.put("alluxioMasterPort", "19998");
+ map.put("alluxioDir", "pulsar");
+ map.put("filePrefix", "TopicA");
+ map.put("fileExtension", ".txt");
+ map.put("lineSeparator", "\n");
+ map.put("rotationRecords", "-100");
+ map.put("rotationInterval", "-1");
+
+ AlluxioSinkConfig config = AlluxioSinkConfig.load(map);
+ config.validate();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class,
+ expectedExceptionsMessageRegExp = "rotationInterval must be either -1 or a positive long.")
+ public final void invalidRotationIntervalTest() throws IOException {
+ Map<String, Object> map = new HashMap<>();
+ map.put("alluxioMasterHost", "localhost");
+ map.put("alluxioMasterPort", "19998");
+ map.put("alluxioDir", "pulsar");
+ map.put("filePrefix", "TopicA");
+ map.put("fileExtension", ".txt");
+ map.put("lineSeparator", "\n");
+ map.put("rotationRecords", "100");
+ map.put("rotationInterval", "-1000");
+
+ AlluxioSinkConfig config = AlluxioSinkConfig.load(map);
+ config.validate();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class,
+ expectedExceptionsMessageRegExp = "No enum constant alluxio.client.WriteType.NOTSUPPORT")
+ public final void invalidClientModeTest() throws IOException {
+ Map<String, Object> map = new HashMap<>();
+ map.put("alluxioMasterHost", "localhost");
+ map.put("alluxioMasterPort", "19998");
+ map.put("alluxioDir", "pulsar");
+ map.put("filePrefix", "TopicA");
+ map.put("fileExtension", ".txt");
+ map.put("lineSeparator", "\n");
+ map.put("rotationRecords", "100");
+ map.put("rotationInterval", "-1");
+ map.put("writeType", "NotSupport");
+
+ AlluxioSinkConfig config = AlluxioSinkConfig.load(map);
+ config.validate();
+
+ WriteType.valueOf(config.getWriteType().toUpperCase());
+ }
+
+
+ private File getFile(String name) {
+ ClassLoader classLoader = getClass().getClassLoader();
+ return new File(classLoader.getResource(name).getFile());
+ }
+}
diff --git a/pulsar-io/alluxio/src/test/java/org/apache/pulsar/io/alluxio/sink/AlluxioSinkTest.java b/pulsar-io/alluxio/src/test/java/org/apache/pulsar/io/alluxio/sink/AlluxioSinkTest.java
new file mode 100644
index 00000000000..366f276ae86
--- /dev/null
+++ b/pulsar-io/alluxio/src/test/java/org/apache/pulsar/io/alluxio/sink/AlluxioSinkTest.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.alluxio.sink;
+
+import alluxio.AlluxioURI;
+import alluxio.client.WriteType;
+import alluxio.client.file.FileSystem;
+import alluxio.client.file.URIStatus;
+import alluxio.conf.PropertyKey;
+import alluxio.conf.ServerConfiguration;
+import alluxio.master.LocalAlluxioCluster;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.GenericObject;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.GenericSchema;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.SinkContext;
+import org.mockito.Mock;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Alluxio Sink test
+ */
+@Slf4j
+public class AlluxioSinkTest {
+
+ @Mock
+ protected SinkContext mockSinkContext;
+
+ protected Map<String, Object> map;
+ protected AlluxioSink sink;
+
+ @Mock
+ protected Record<GenericObject> mockRecord;
+
+ static Schema kvSchema;
+ static Schema<Foobar> valueSchema;
+ static GenericSchema<GenericRecord> genericSchema;
+ static GenericRecord fooBar;
+
+ @BeforeClass
+ public static void init() {
+ valueSchema = Schema.JSON(Foobar.class);
+ genericSchema = Schema.generic(valueSchema.getSchemaInfo());
+ fooBar = genericSchema.newRecordBuilder()
+ .set("name", "foo")
+ .set("address", "foobar")
+ .set("age", 20)
+ .build();
+ kvSchema = Schema.KeyValue(Schema.STRING, genericSchema, KeyValueEncodingType.SEPARATED);
+ }
+
+ @BeforeMethod
+ public final void setUp() {
+ map = new HashMap<>();
+ map.put("alluxioMasterHost", "localhost");
+ map.put("alluxioMasterPort", "19998");
+ map.put("alluxioDir", "/pulsar");
+ map.put("filePrefix", "prefix");
+ map.put("schemaEnable", "true");
+
+ mockRecord = mock(Record.class);
+ mockSinkContext = mock(SinkContext.class);
+
+ when(mockRecord.getKey()).thenAnswer(new Answer<Optional<String>>() {
+ int count = 0;
+ public Optional<String> answer(InvocationOnMock invocation) throws Throwable {
+ return Optional.of( "key-" + count++);
+ }});
+
+ when(mockRecord.getValue()).thenAnswer((Answer<GenericObject>) invocation -> new GenericObject() {
+ @Override
+ public SchemaType getSchemaType() {
+ return SchemaType.KEY_VALUE;
+ }
+
+ @Override
+ public Object getNativeObject() {
+ return new KeyValue<String, GenericObject>((String) fooBar.getField("address"), fooBar);
+ }
+ });
+
+ when(mockRecord.getSchema()).thenAnswer((Answer<Schema<KeyValue<String, Foobar>>>) invocation -> kvSchema);
+ }
+
+ @Test
+ public void openTest() throws Exception {
+ map.put("filePrefix", "TopicA");
+ map.put("fileExtension", ".txt");
+ map.put("lineSeparator", "\n");
+ map.put("rotationRecords", "100");
+
+ String alluxioDir = "/pulsar";
+
+ LocalAlluxioCluster cluster = setupSingleMasterCluster();
+
+ sink = new AlluxioSink();
+ sink.open(map, mockSinkContext);
+
+ FileSystem client = cluster.getClient();
+
+ AlluxioURI alluxioURI = new AlluxioURI(alluxioDir);
+ Assert.assertTrue(client.exists(alluxioURI));
+
+ String alluxioTmpDir = FilenameUtils.concat(alluxioDir, "tmp");
+ AlluxioURI alluxioTmpURI = new AlluxioURI(alluxioTmpDir);
+ Assert.assertTrue(client.exists(alluxioTmpURI));
+
+ sink.close();
+ cluster.stop();
+ }
+
+ @Test
+ public void writeAndCloseTest() throws Exception {
+ map.put("filePrefix", "TopicA");
+ map.put("fileExtension", ".txt");
+ map.put("lineSeparator", "\n");
+ map.put("rotationRecords", "1");
+ map.put("writeType", "THROUGH");
+ map.put("alluxioDir", "/pulsar");
+
+ String alluxioDir = "/pulsar";
+
+ LocalAlluxioCluster cluster = setupSingleMasterCluster();
+
+ sink = new AlluxioSink();
+ sink.open(map, mockSinkContext);
+
+ sink.write(() -> new GenericObject() {
+ @Override
+ public SchemaType getSchemaType() {
+ return SchemaType.KEY_VALUE;
+ }
+
+ @Override
+ public Object getNativeObject() {
+ return new KeyValue<>((String) fooBar.getField("address"), fooBar);
+ }
+ });
+
+ FileSystem client = cluster.getClient();
+
+ AlluxioURI alluxioURI = new AlluxioURI(alluxioDir);
+ Assert.assertTrue(client.exists(alluxioURI));
+
+ String alluxioTmpDir = FilenameUtils.concat(alluxioDir, "tmp");
+ AlluxioURI alluxioTmpURI = new AlluxioURI(alluxioTmpDir);
+ Assert.assertTrue(client.exists(alluxioTmpURI));
+
+ List<URIStatus> listAlluxioDirStatus = client.listStatus(alluxioURI);
+
+ List<String> pathList = listAlluxioDirStatus.stream().map(URIStatus::getPath).collect(Collectors.toList());
+
+ Assert.assertEquals(pathList.size(), 2);
+
+ for (String path : pathList) {
+ if (path.contains("tmp")) {
+ Assert.assertEquals(path, "/pulsar/tmp");
+ } else {
+ Assert.assertTrue(path.startsWith("/pulsar/TopicA-"));
+ }
+ }
+
+ sink.close();
+ cluster.stop();
+ }
+
+ private LocalAlluxioCluster setupSingleMasterCluster() throws Exception {
+ // Setup and start the local alluxio cluster
+ LocalAlluxioCluster cluster = new LocalAlluxioCluster();
+ cluster.initConfiguration(getTestName(getClass().getSimpleName(), LocalAlluxioCluster.DEFAULT_TEST_NAME));
+ ServerConfiguration.set(PropertyKey.USER_FILE_WRITE_TYPE_DEFAULT, WriteType.MUST_CACHE);
+ cluster.start();
+ return cluster;
+ }
+
+ public String getTestName(String className, String methodName) {
+ String testName = className + "-" + methodName;
+ // cannot use these characters in the name/path: . [ ]
+ testName = testName.replace(".", "-");
+ testName = testName.replace("[", "-");
+ testName = testName.replace("]", "");
+ return testName;
+ }
+}
diff --git a/pulsar-io/alluxio/src/test/java/org/apache/pulsar/io/alluxio/sink/Foobar.java b/pulsar-io/alluxio/src/test/java/org/apache/pulsar/io/alluxio/sink/Foobar.java
new file mode 100644
index 00000000000..0a6c44ca9ce
--- /dev/null
+++ b/pulsar-io/alluxio/src/test/java/org/apache/pulsar/io/alluxio/sink/Foobar.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.alluxio.sink;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class Foobar {
+ String name;
+ String address;
+ int age;
+}
diff --git a/pulsar-io/alluxio/src/test/resources/sinkConfig.yaml b/pulsar-io/alluxio/src/test/resources/sinkConfig.yaml
new file mode 100644
index 00000000000..f7907d4e02f
--- /dev/null
+++ b/pulsar-io/alluxio/src/test/resources/sinkConfig.yaml
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+{
+"alluxioMasterHost": "localhost",
+"alluxioMasterPort": "19998",
+"alluxioDir": "pulsar",
+"filePrefix": "TopicA",
+"fileExtension": ".txt",
+"lineSeparator": "\n",
+"rotationRecords": "100",
+"rotationInterval": "-1"
+}
diff --git a/pulsar-io/pom.xml b/pulsar-io/pom.xml
index a5e096aff59..fb83c6fba19 100644
--- a/pulsar-io/pom.xml
+++ b/pulsar-io/pom.xml
@@ -112,6 +112,7 @@
<module>influxdb</module>
<module>dynamodb</module>
<module>nsq</module>
+ <module>alluxio</module>
</modules>
</profile>
diff --git a/site2/docs/io-alluxio.md b/site2/docs/io-alluxio.md
new file mode 100644
index 00000000000..59c3247f9e6
--- /dev/null
+++ b/site2/docs/io-alluxio.md
@@ -0,0 +1,62 @@
+---
+id: io-alluxio
+title: Alluxio sink connector
+sidebar_label: Alluxio sink connector
+---
+
+## Sink
+
+The Alluxio sink connector pulls messages from Pulsar topics and persists the messages to an Alluxio directory.
+
+## Configuration
+
+The configuration of the Alluxio sink connector has the following properties.
+
+### Property
+
+| Name | Type|Required | Default | Description
+|------|----------|----------|---------|-------------|
+| `alluxioMasterHost` | String | true | "" (empty string) | The hostname of Alluxio master. |
+| `alluxioMasterPort` | int | true | 19998 | The port that Alluxio master node runs on. |
+| `alluxioDir` | String | true | "" (empty string) | The Alluxio directory from which files should be read from or written to. |
+| `securityLoginUser` | String | false | "" (empty string) | When `alluxio.security.authentication.type` is set to `SIMPLE` or `CUSTOM`, user application uses this property to indicate the user requesting Alluxio service. If it is not set explicitly, the OS login user is used. |
+| `filePrefix` | String | false | "" (empty string) | The prefix of the files to create in the Alluxio directory (e.g. a value of 'TopicA' results in files named topicA-, topicA-, etc being produced). |
+| `fileExtension` | String | false | "" (empty string) | The extension to add to the files written to Alluxio (e.g. '.txt'). |
+| `lineSeparator` | String | false | "" (empty string) | The character used to separate records in a text file. If no value is provided, then the content from all of the records is concatenated together in one continuous byte array. |
+| `rotationRecords` | long | false | 10000 | The number records of Alluxio file rotation. |
+| `rotationInterval` | long | false | -1 | The interval to rotate a Alluxio file (in milliseconds). |
+| `schemaEnable` | boolean | false | false | Sets whether the Sink has to take into account the Schema or if it should simply copy the raw message to Alluxio. |
+| `writeType` | String | false | `MUST_CACHE` | Default write type when creating Alluxio files. Valid options are `MUST_CACHE` (write only goes to Alluxio and must be stored in Alluxio), `CACHE_THROUGH` (try to cache, write to UnderFS synchronously), `THROUGH` (no cache, write to UnderFS synchronously). |
+
+### Example
+
+Before using the Alluxio sink connector, you need to create a configuration file in the path you will start Pulsar service (i.e. `PULSAR_HOME`) through one of the following methods.
+
+* JSON
+
+ ```json
+ {
+ "alluxioMasterHost": "localhost",
+ "alluxioMasterPort": "19998",
+ "alluxioDir": "pulsar",
+ "filePrefix": "TopicA",
+ "fileExtension": ".txt",
+ "lineSeparator": "\n",
+ "rotationRecords": "100",
+ "rotationInterval": "-1"
+ }
+ ```
+
+* YAML
+
+ ```yaml
+ configs:
+ alluxioMasterHost: "localhost"
+ alluxioMasterPort: "19998"
+ alluxioDir: "pulsar"
+ filePrefix: "TopicA"
+ fileExtension: ".txt"
+ lineSeparator: "\n"
+ rotationRecords: 100
+ rotationInterval: "-1"
+ ```
\ No newline at end of file