You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ti...@apache.org on 2022/12/10 16:59:22 UTC

[pulsar] branch master updated: [feat][io] Add Alluxio sink connector (#3823)

This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b0695fb2192 [feat][io] Add Alluxio sink connector (#3823)
b0695fb2192 is described below

commit b0695fb2192e04b9d8c522a1944a3b8e18755e15
Author: Fangbin Sun <su...@gmail.com>
AuthorDate: Sun Dec 11 00:59:16 2022 +0800

    [feat][io] Add Alluxio sink connector (#3823)
---
 pulsar-io/alluxio/pom.xml                          | 118 ++++++++
 .../pulsar/io/alluxio/AlluxioAbstractConfig.java   |  76 +++++
 .../apache/pulsar/io/alluxio/sink/AlluxioSink.java | 337 +++++++++++++++++++++
 .../pulsar/io/alluxio/sink/AlluxioSinkConfig.java  | 112 +++++++
 .../resources/META-INF/services/pulsar-io.yaml     |  21 ++
 .../io/alluxio/sink/AlluxioSinkConfigTest.java     | 168 ++++++++++
 .../pulsar/io/alluxio/sink/AlluxioSinkTest.java    | 220 ++++++++++++++
 .../org/apache/pulsar/io/alluxio/sink/Foobar.java  |  32 ++
 .../alluxio/src/test/resources/sinkConfig.yaml     |  29 ++
 pulsar-io/pom.xml                                  |   1 +
 site2/docs/io-alluxio.md                           |  62 ++++
 11 files changed, 1176 insertions(+)

diff --git a/pulsar-io/alluxio/pom.xml b/pulsar-io/alluxio/pom.xml
new file mode 100644
index 00000000000..e8a5436d612
--- /dev/null
+++ b/pulsar-io/alluxio/pom.xml
@@ -0,0 +1,118 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <artifactId>pulsar-io</artifactId>
+        <groupId>org.apache.pulsar</groupId>
+        <version>2.11.0-SNAPSHOT</version>
+    </parent>
+
+    <properties>
+        <alluxio.version>2.5.0</alluxio.version>
+        <metrics.version>4.1.11</metrics.version>
+        <grpc.version>1.37.0</grpc.version>
+    </properties>
+
+    <artifactId>pulsar-io-alluxio</artifactId>
+    <name>Pulsar IO :: Alluxio</name>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>pulsar-io-core</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>pulsar-client-original</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.alluxio</groupId>
+            <artifactId>alluxio-core-client-fs</artifactId>
+            <version>${alluxio.version}</version>
+            <exclusions>
+                <exclusion>
+                    <artifactId>grpc-netty</artifactId>
+                    <groupId>io.grpc</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.alluxio</groupId>
+            <artifactId>alluxio-minicluster</artifactId>
+            <version>${alluxio.version}</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.glassfish</groupId>
+                    <artifactId>javax.el</artifactId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>grpc-netty</artifactId>
+                    <groupId>io.grpc</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>com.fasterxml.jackson.dataformat</groupId>
+            <artifactId>jackson-dataformat-yaml</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
+
+        <!-- alluxio grpc dependency need higher version -->
+        <dependency>
+            <groupId>io.grpc</groupId>
+            <artifactId>grpc-netty</artifactId>
+            <version>${grpc.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>io.dropwizard.metrics</groupId>
+            <artifactId>metrics-jvm</artifactId>
+            <version>${metrics.version}</version>
+        </dependency>
+
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.nifi</groupId>
+                <artifactId>nifi-nar-maven-plugin</artifactId>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git a/pulsar-io/alluxio/src/main/java/org/apache/pulsar/io/alluxio/AlluxioAbstractConfig.java b/pulsar-io/alluxio/src/main/java/org/apache/pulsar/io/alluxio/AlluxioAbstractConfig.java
new file mode 100644
index 00000000000..96fe38a5893
--- /dev/null
+++ b/pulsar-io/alluxio/src/main/java/org/apache/pulsar/io/alluxio/AlluxioAbstractConfig.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.alluxio;
+
+import com.google.common.base.Preconditions;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.experimental.Accessors;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
+
+import java.io.Serializable;
+
+/**
+ * Configuration object for all Alluxio Sink components.
+ */
+@Data
+@Setter
+@Getter
+@EqualsAndHashCode
+@ToString
+@Accessors(chain = true)
+public class AlluxioAbstractConfig implements Serializable {
+
+    private static final long serialVersionUID = 3727671407445918309L;
+
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help = "The hostname of Alluxio master")
+    private String alluxioMasterHost;
+
+    @FieldDoc(
+        required = true,
+        defaultValue = "19998",
+        help = "The port that Alluxio master node runs on")
+    private int alluxioMasterPort = 19998;
+
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help = "The Alluxio directory from which files should be read from or written to")
+    private String alluxioDir;
+
+    @FieldDoc(
+        required = false,
+        defaultValue = "",
+        help = "When `alluxio.security.authentication.type` is set to `SIMPLE` or `CUSTOM`, user application uses"
+            + " this property to indicate the user requesting Alluxio service. If it is not set explicitly,"
+            + " the OS login user is used")
+    private String securityLoginUser;
+
+    public void validate() {
+        Preconditions.checkNotNull(alluxioMasterHost, "alluxioMasterHost property not set.");
+        Preconditions.checkNotNull(alluxioMasterPort, "alluxioMasterPort property not set.");
+        Preconditions.checkNotNull(alluxioDir, "alluxioDir property not set.");
+    }
+}
diff --git a/pulsar-io/alluxio/src/main/java/org/apache/pulsar/io/alluxio/sink/AlluxioSink.java b/pulsar-io/alluxio/src/main/java/org/apache/pulsar/io/alluxio/sink/AlluxioSink.java
new file mode 100644
index 00000000000..87cc643a2bc
--- /dev/null
+++ b/pulsar-io/alluxio/src/main/java/org/apache/pulsar/io/alluxio/sink/AlluxioSink.java
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.alluxio.sink;
+
+import alluxio.AlluxioURI;
+import alluxio.client.WriteType;
+import alluxio.client.file.FileOutStream;
+import alluxio.client.file.FileSystem;
+import alluxio.conf.InstancedConfiguration;
+import alluxio.conf.PropertyKey;
+import alluxio.exception.AlluxioException;
+import alluxio.grpc.CreateFilePOptions;
+import alluxio.grpc.WritePType;
+import alluxio.util.FileSystemOptions;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.GenericObject;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.KeyValueSchema;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.KeyValue;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+import org.apache.pulsar.io.core.annotations.Connector;
+import org.apache.pulsar.io.core.annotations.IOType;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * Alluxio sink that treats incoming messages on the input topic as Strings
+ * and write identical key/value pairs.
+ */
+@Connector(
+        name = "alluxio",
+        type = IOType.SINK,
+        help = "The sink connector is used for moving records from Pulsar to Alluxio.",
+        configClass = AlluxioSinkConfig.class)
+@Slf4j
+public class AlluxioSink implements Sink<GenericObject> {
+
+    private FileSystem fileSystem;
+    private FileOutStream fileOutStream;
+    private CreateFilePOptions.Builder optionsBuilder;
+    private long recordsNum;
+    private String tmpFilePath;
+    private String fileDirPath;
+    private String tmpFileDirPath;
+    private long lastRotationTime;
+    private long rotationRecordsNum;
+    private long rotationInterval;
+    private AlluxioSinkConfig alluxioSinkConfig;
+    private AlluxioState alluxioState;
+
+    private InstancedConfiguration configuration = InstancedConfiguration.defaults();
+
+    private ObjectMapper objectMapper = new ObjectMapper();
+
+    private List<Record<GenericObject>> recordsToAck;
+
+    @Override
+    public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
+        alluxioSinkConfig = AlluxioSinkConfig.load(config);
+        alluxioSinkConfig.validate();
+
+        // initialize FileSystem
+        String alluxioMasterHost = alluxioSinkConfig.getAlluxioMasterHost();
+        int alluxioMasterPort = alluxioSinkConfig.getAlluxioMasterPort();
+        InstancedConfiguration.defaults().set(PropertyKey.MASTER_HOSTNAME, alluxioMasterHost);
+        configuration.set(PropertyKey.MASTER_RPC_PORT, alluxioMasterPort);
+        if (alluxioSinkConfig.getSecurityLoginUser() != null) {
+            configuration.set(PropertyKey.SECURITY_LOGIN_USERNAME, alluxioSinkConfig.getSecurityLoginUser());
+        }
+        fileSystem = FileSystem.Factory.create(configuration);
+
+        // initialize alluxio dirs
+        String alluxioDir = alluxioSinkConfig.getAlluxioDir();
+        fileDirPath = alluxioDir.startsWith("/") ? alluxioDir : "/" + alluxioDir;
+        tmpFileDirPath = fileDirPath + "/tmp";
+
+        AlluxioURI alluxioDirPath = new AlluxioURI(fileDirPath);
+        if (!fileSystem.exists(alluxioDirPath)) {
+            fileSystem.createDirectory(alluxioDirPath);
+        }
+
+        AlluxioURI tmpAlluxioDirPath = new AlluxioURI(tmpFileDirPath);
+        if (!fileSystem.exists(tmpAlluxioDirPath)) {
+            fileSystem.createDirectory(tmpAlluxioDirPath);
+        }
+
+        optionsBuilder = FileSystemOptions.createFileDefaults(configuration).toBuilder();
+
+        recordsNum = 0;
+        recordsToAck = Lists.newArrayList();
+        tmpFilePath = "";
+        alluxioState = AlluxioState.WRITE_STARTED;
+
+        lastRotationTime = System.currentTimeMillis();
+        rotationRecordsNum = alluxioSinkConfig.getRotationRecords();
+        rotationInterval =  alluxioSinkConfig.getRotationInterval();
+    }
+
+    @Override
+    public void write(Record<GenericObject> record) {
+        long now = System.currentTimeMillis();
+
+        switch (alluxioState) {
+            case WRITE_STARTED:
+                try {
+                    writeToAlluxio(record);
+                    if (!shouldRotate(now)) {
+                        break;
+                    }
+                    alluxioState = AlluxioState.FILE_ROTATED;
+                } catch (AlluxioException | IOException e) {
+                    log.error("Unable to write record to alluxio.", e);
+                    record.fail();
+                    break;
+                }
+            case FILE_ROTATED:
+                try {
+                    closeAndCommitTmpFile();
+                    alluxioState = AlluxioState.FILE_COMMITTED;
+                    ackRecords();
+                } catch (AlluxioException | IOException e) {
+                    log.error("Unable to flush records to alluxio.", e);
+                    failRecords();
+                    try {
+                        deleteTmpFile();
+                    } catch (AlluxioException | IOException e1) {
+                        log.error("Failed to delete tmp cache file.", e);
+                    }
+                    break;
+                }
+            case FILE_COMMITTED:
+                alluxioState = AlluxioState.WRITE_STARTED;
+                break;
+            default:
+                log.error("{} is not a valid state when writing record to alluxio temp dir {}.",
+                    alluxioState, tmpFileDirPath);
+                break;
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        // flush records in the tmpFile when closing sink
+        try {
+            closeAndCommitTmpFile();
+            ackRecords();
+        } catch (AlluxioException | IOException e) {
+            log.error("Unable to flush records to alluxio.", e);
+            failRecords();
+        }
+        deleteTmpFile();
+    }
+
+    private void ackRecords() {
+        recordsToAck.forEach(Record::ack);
+        recordsToAck.clear();
+    }
+
+    private void failRecords() {
+        recordsToAck.forEach(Record::fail);
+        recordsToAck.clear();
+    }
+
+    private void writeToAlluxio(Record<GenericObject> record) throws AlluxioException, IOException {
+        KeyValue<String, String> keyValue = extractKeyValue(record);
+        if (fileOutStream == null) {
+            createTmpFile();
+        }
+        fileOutStream.write(toBytes(keyValue.getValue()));
+        if (alluxioSinkConfig.getLineSeparator() != '\u0000') {
+            fileOutStream.write(alluxioSinkConfig.getLineSeparator());
+        }
+        recordsNum++;
+        recordsToAck.add(record);
+    }
+
+    private void createTmpFile() throws AlluxioException, IOException {
+        UUID id = UUID.randomUUID();
+        String fileExtension = alluxioSinkConfig.getFileExtension();
+        tmpFilePath = tmpFileDirPath + "/" + id.toString() + "_tmp" + fileExtension;
+        if (alluxioSinkConfig.getWriteType() != null) {
+            WritePType writePType;
+            try {
+                writePType = WritePType.valueOf(alluxioSinkConfig.getWriteType().toUpperCase());
+            } catch (IllegalArgumentException e) {
+                throw new IllegalArgumentException("Illegal write type when creating Alluxio files, valid values are: "
+                    + Arrays.asList(WriteType.values()));
+            }
+            optionsBuilder.setWriteType(writePType);
+        }
+        fileOutStream = fileSystem.createFile(new AlluxioURI(tmpFilePath), optionsBuilder.build());
+    }
+
+    private void closeAndCommitTmpFile() throws AlluxioException, IOException {
+        // close the tmpFile
+        if (fileOutStream != null) {
+            fileOutStream.close();
+        }
+        // commit the tmpFile
+        String filePrefix = alluxioSinkConfig.getFilePrefix();
+        String fileExtension = alluxioSinkConfig.getFileExtension();
+        String newFile = filePrefix + "-" + System.currentTimeMillis() + fileExtension;
+        String newFilePath = fileDirPath + "/" + newFile;
+        fileSystem.rename(new AlluxioURI(tmpFilePath), new AlluxioURI(newFilePath));
+        fileOutStream = null;
+        tmpFilePath = "";
+        recordsNum = 0;
+        lastRotationTime = System.currentTimeMillis();
+    }
+
+    private void deleteTmpFile() throws AlluxioException, IOException {
+        if (!tmpFilePath.equals("")) {
+            fileSystem.delete(new AlluxioURI(tmpFilePath));
+        }
+    }
+
+    private boolean shouldRotate(long now) {
+        boolean rotated = false;
+        if (recordsNum >= rotationRecordsNum) {
+            rotated = true;
+        } else {
+            if (rotationInterval != -1 && (now - lastRotationTime) >= rotationInterval) {
+                rotated = true;
+            }
+        }
+        return rotated;
+    }
+
+    private static byte[] toByteArray(Object obj) throws IOException {
+        byte[] bytes = null;
+        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+             ObjectOutputStream oos = new ObjectOutputStream(baos)) {
+            oos.writeObject(obj);
+            oos.flush();
+            bytes = baos.toByteArray();
+        } catch (IOException e) {
+            log.error("Failed to serialize the object.", e);
+            throw e;
+        }
+        return bytes;
+    }
+
+    private static byte[] toBytes(Object obj) throws IOException {
+        byte[] bytes;
+        if (obj instanceof String) {
+            String s = (String) obj;
+            bytes = s.getBytes(StandardCharsets.UTF_8);
+        } else if (obj instanceof byte[]) {
+            bytes = (byte[]) obj;
+        } else {
+            bytes = toByteArray(obj);
+        }
+        return bytes;
+    }
+
+    public KeyValue<String, String> extractKeyValue(Record<GenericObject> record) throws JsonProcessingException {
+        // just ignore the key
+        if (alluxioSinkConfig.isSchemaEnable()) {
+            GenericObject recordValue = null;
+            Schema<?> valueSchema = null;
+            if (record.getSchema() != null && record.getSchema() instanceof KeyValueSchema) {
+                KeyValueSchema<GenericObject, GenericObject> keyValueSchema = (KeyValueSchema) record.getSchema();
+                valueSchema = keyValueSchema.getValueSchema();
+                org.apache.pulsar.common.schema.KeyValue<GenericObject, GenericObject> keyValue =
+                        (org.apache.pulsar.common.schema.KeyValue<GenericObject, GenericObject>) record.getValue().getNativeObject();
+                recordValue = keyValue.getValue();
+            } else {
+                valueSchema = record.getSchema();
+                recordValue = record.getValue();
+            }
+
+            String value = null;
+            if (recordValue != null) {
+                if (valueSchema != null) {
+                    value = stringifyValue(valueSchema, recordValue);
+                } else {
+                    if (recordValue.getNativeObject() instanceof byte[]) {
+                        value = new String((byte[]) recordValue.getNativeObject(), StandardCharsets.UTF_8);
+                    } else {
+                        value = recordValue.getNativeObject().toString();
+                    }
+                }
+            }
+            return new KeyValue<>(null, value);
+        } else {
+            return new KeyValue<>(null, new String(record.getMessage()
+                            .orElseThrow(() -> new IllegalArgumentException("Record does not carry message information"))
+                            .getData(), StandardCharsets.UTF_8));
+        }
+    }
+
+    public String stringifyValue(Schema<?> schema, Object val) throws JsonProcessingException {
+        // just support json schema
+        if (schema.getSchemaInfo().getType() == SchemaType.JSON) {
+            JsonNode jsonNode = (JsonNode) ((GenericRecord) val).getNativeObject();
+            return objectMapper.writeValueAsString(jsonNode);
+        }
+        throw new UnsupportedOperationException("Unsupported value schemaType=" + schema.getSchemaInfo().getType());
+    }
+
+    private enum AlluxioState {
+        WRITE_STARTED,
+        FILE_ROTATED,
+        FILE_COMMITTED
+    }
+}
diff --git a/pulsar-io/alluxio/src/main/java/org/apache/pulsar/io/alluxio/sink/AlluxioSinkConfig.java b/pulsar-io/alluxio/src/main/java/org/apache/pulsar/io/alluxio/sink/AlluxioSinkConfig.java
new file mode 100644
index 00000000000..fde7a402c22
--- /dev/null
+++ b/pulsar-io/alluxio/src/main/java/org/apache/pulsar/io/alluxio/sink/AlluxioSinkConfig.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.alluxio.sink;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import com.google.common.base.Preconditions;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.experimental.Accessors;
+import org.apache.pulsar.io.alluxio.AlluxioAbstractConfig;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+
+@Data
+@Setter
+@Getter
+@EqualsAndHashCode(callSuper = false)
+@ToString
+@Accessors(chain = true)
+public class AlluxioSinkConfig extends AlluxioAbstractConfig implements Serializable {
+
+    private static final long serialVersionUID = -8917657634001769807L;
+
+    @FieldDoc(
+        required = false,
+        defaultValue = "",
+        help = "The prefix of the files to create in the Alluxio directory (e.g. a value of 'TopicA' results"
+            + " in files named topicA-, topicA-, etc being produced)")
+    private String filePrefix;
+
+    @FieldDoc(
+        required = false,
+        defaultValue = "",
+        help = "The extension to add to the files written to Alluxio (e.g. '.txt')")
+    private String fileExtension;
+
+    @FieldDoc(
+        required = false,
+        defaultValue = "",
+        help = "The character used to separate records in a text file. If no value is provided then the content"
+            + " from all of the records is concatenated together in one continuous byte array")
+    private char lineSeparator;
+
+    @FieldDoc(
+        required = false,
+        defaultValue = "10000L",
+        help = "The number records of Alluxio file rotation")
+    private long rotationRecords = 10000L;
+
+    @FieldDoc(
+        required = false,
+        defaultValue = "-1L",
+        help = "The interval to rotate a Alluxio file (in milliseconds)")
+    private long rotationInterval = -1L;
+
+    @FieldDoc(
+        required = false,
+        defaultValue = "MUST_CACHE",
+        help = "Default write type when creating Alluxio files. Valid options are `MUST_CACHE` (write only goes to"
+            + " Alluxio and must be stored in Alluxio), `CACHE_THROUGH` (try to cache, write to UnderFS synchronously),"
+            + " `THROUGH` (no cache, write to UnderFS synchronously)")
+    private String writeType = "MUST_CACHE";
+
+    @FieldDoc(
+            required = false,
+            defaultValue = "false",
+            help = "Sets whether the Sink has to take into account the Schema or if it should simply copy the raw message to Alluxio"
+    )
+    private boolean schemaEnable = false;
+
+    public static AlluxioSinkConfig load(String yamlFile) throws IOException {
+        ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
+        return mapper.readValue(new File(yamlFile), AlluxioSinkConfig.class);
+    }
+
+    public static AlluxioSinkConfig load(Map<String, Object> map) throws IOException {
+        ObjectMapper mapper = new ObjectMapper();
+        return mapper.readValue(new ObjectMapper().writeValueAsString(map), AlluxioSinkConfig.class);
+    }
+
+    @Override
+    public void validate() {
+        super.validate();
+        Preconditions.checkArgument(rotationRecords > 0, "rotationRecords must be a positive long.");
+        Preconditions.checkArgument(rotationInterval == -1 || rotationInterval > 0,
+            "rotationInterval must be either -1 or a positive long.");
+    }
+}
diff --git a/pulsar-io/alluxio/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/alluxio/src/main/resources/META-INF/services/pulsar-io.yaml
new file mode 100644
index 00000000000..02314241f28
--- /dev/null
+++ b/pulsar-io/alluxio/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -0,0 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+name: alluxio
+description: Writes data into Alluxio
+sinkClass: org.apache.pulsar.io.alluxio.sink.AlluxioSink
\ No newline at end of file
diff --git a/pulsar-io/alluxio/src/test/java/org/apache/pulsar/io/alluxio/sink/AlluxioSinkConfigTest.java b/pulsar-io/alluxio/src/test/java/org/apache/pulsar/io/alluxio/sink/AlluxioSinkConfigTest.java
new file mode 100644
index 00000000000..9454393328c
--- /dev/null
+++ b/pulsar-io/alluxio/src/test/java/org/apache/pulsar/io/alluxio/sink/AlluxioSinkConfigTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.alluxio.sink;
+
+import alluxio.client.WriteType;
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+/**
+ * AlluxioSinkConfig test
+ */
+public class AlluxioSinkConfigTest {
+
+    @Test
+    public final void loadFromYamlFileTest() throws IOException {
+        File yamlFile = getFile("sinkConfig.yaml");
+        String path = yamlFile.getAbsolutePath();
+        AlluxioSinkConfig config = AlluxioSinkConfig.load(path);
+        assertNotNull(config);
+        assertEquals("localhost", config.getAlluxioMasterHost());
+        assertEquals(Integer.parseInt("19998"), config.getAlluxioMasterPort());
+        assertEquals("pulsar", config.getAlluxioDir());
+        assertEquals("TopicA", config.getFilePrefix());
+        assertEquals(".txt", config.getFileExtension());
+        assertEquals("\n".charAt(0), config.getLineSeparator());
+        assertEquals(Long.parseLong("100"), config.getRotationRecords());
+        assertEquals(Long.parseLong("-1"), config.getRotationInterval());
+    }
+
+    @Test
+    public final void loadFromMapTest() throws IOException {
+        Map<String, Object> map = new HashMap<>();
+        map.put("alluxioMasterHost", "localhost");
+        map.put("alluxioMasterPort", "19998");
+        map.put("alluxioDir", "pulsar");
+        map.put("filePrefix", "TopicA");
+        map.put("fileExtension", ".txt");
+        map.put("lineSeparator", "\n");
+        map.put("rotationRecords", "100");
+        map.put("rotationInterval", "-1");
+
+        AlluxioSinkConfig config = AlluxioSinkConfig.load(map);
+        assertNotNull(config);
+        assertEquals("localhost", config.getAlluxioMasterHost());
+        assertEquals(Integer.parseInt("19998"), config.getAlluxioMasterPort());
+        assertEquals("pulsar", config.getAlluxioDir());
+        assertEquals("TopicA", config.getFilePrefix());
+        assertEquals(".txt", config.getFileExtension());
+        assertEquals("\n".charAt(0), config.getLineSeparator());
+        assertEquals(Long.parseLong("100"), config.getRotationRecords());
+        assertEquals(Long.parseLong("-1"), config.getRotationInterval());
+    }
+
+    @Test
+    public final void validateTest() throws IOException {
+        Map<String, Object> map = new HashMap<>();
+        map.put("alluxioMasterHost", "localhost");
+        map.put("alluxioMasterPort", "19998");
+        map.put("alluxioDir", "pulsar");
+        map.put("filePrefix", "TopicA");
+        map.put("fileExtension", ".txt");
+        map.put("lineSeparator", "\n");
+        map.put("rotationRecords", "100");
+        map.put("rotationInterval", "-1");
+
+        AlluxioSinkConfig config = AlluxioSinkConfig.load(map);
+        config.validate();
+    }
+
+    @Test(expectedExceptions = NullPointerException.class,
+        expectedExceptionsMessageRegExp = "alluxioDir property not set.")
+    public final void missingValidateAlluxioDirTest() throws IOException {
+        Map<String, Object> map = new HashMap<>();
+        map.put("alluxioMasterHost", "localhost");
+        map.put("alluxioMasterPort", "19998");
+        map.put("filePrefix", "TopicA");
+        map.put("fileExtension", ".txt");
+        map.put("lineSeparator", "\n");
+        map.put("rotationRecords", "100");
+        map.put("rotationInterval", "-1");
+
+        AlluxioSinkConfig config = AlluxioSinkConfig.load(map);
+        config.validate();
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class,
+        expectedExceptionsMessageRegExp = "rotationRecords must be a positive long.")
+    public final void invalidRotationRecordsTest() throws IOException {
+        Map<String, Object> map = new HashMap<>();
+        map.put("alluxioMasterHost", "localhost");
+        map.put("alluxioMasterPort", "19998");
+        map.put("alluxioDir", "pulsar");
+        map.put("filePrefix", "TopicA");
+        map.put("fileExtension", ".txt");
+        map.put("lineSeparator", "\n");
+        map.put("rotationRecords", "-100");
+        map.put("rotationInterval", "-1");
+
+        AlluxioSinkConfig config = AlluxioSinkConfig.load(map);
+        config.validate();
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class,
+        expectedExceptionsMessageRegExp = "rotationInterval must be either -1 or a positive long.")
+    public final void invalidRotationIntervalTest() throws IOException {
+        Map<String, Object> map = new HashMap<>();
+        map.put("alluxioMasterHost", "localhost");
+        map.put("alluxioMasterPort", "19998");
+        map.put("alluxioDir", "pulsar");
+        map.put("filePrefix", "TopicA");
+        map.put("fileExtension", ".txt");
+        map.put("lineSeparator", "\n");
+        map.put("rotationRecords", "100");
+        map.put("rotationInterval", "-1000");
+
+        AlluxioSinkConfig config = AlluxioSinkConfig.load(map);
+        config.validate();
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class,
+        expectedExceptionsMessageRegExp = "No enum constant alluxio.client.WriteType.NOTSUPPORT")
+    public final void invalidClientModeTest() throws IOException {
+        Map<String, Object> map = new HashMap<>();
+        map.put("alluxioMasterHost", "localhost");
+        map.put("alluxioMasterPort", "19998");
+        map.put("alluxioDir", "pulsar");
+        map.put("filePrefix", "TopicA");
+        map.put("fileExtension", ".txt");
+        map.put("lineSeparator", "\n");
+        map.put("rotationRecords", "100");
+        map.put("rotationInterval", "-1");
+        map.put("writeType", "NotSupport");
+
+        AlluxioSinkConfig config = AlluxioSinkConfig.load(map);
+        config.validate();
+
+        WriteType.valueOf(config.getWriteType().toUpperCase());
+    }
+
+
+    private File getFile(String name) {
+        ClassLoader classLoader = getClass().getClassLoader();
+        return new File(classLoader.getResource(name).getFile());
+    }
+}
diff --git a/pulsar-io/alluxio/src/test/java/org/apache/pulsar/io/alluxio/sink/AlluxioSinkTest.java b/pulsar-io/alluxio/src/test/java/org/apache/pulsar/io/alluxio/sink/AlluxioSinkTest.java
new file mode 100644
index 00000000000..366f276ae86
--- /dev/null
+++ b/pulsar-io/alluxio/src/test/java/org/apache/pulsar/io/alluxio/sink/AlluxioSinkTest.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.alluxio.sink;
+
+import alluxio.AlluxioURI;
+import alluxio.client.WriteType;
+import alluxio.client.file.FileSystem;
+import alluxio.client.file.URIStatus;
+import alluxio.conf.PropertyKey;
+import alluxio.conf.ServerConfiguration;
+import alluxio.master.LocalAlluxioCluster;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.GenericObject;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.GenericSchema;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.SinkContext;
+import org.mockito.Mock;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Alluxio Sink test
+ */
+@Slf4j
+public class AlluxioSinkTest {
+
+    @Mock
+    protected SinkContext mockSinkContext;
+
+    protected Map<String, Object> map;
+    protected AlluxioSink sink;
+
+    @Mock
+    protected Record<GenericObject> mockRecord;
+
+    static Schema kvSchema;
+    static Schema<Foobar> valueSchema;
+    static GenericSchema<GenericRecord> genericSchema;
+    static GenericRecord fooBar;
+
+    @BeforeClass
+    public static void init() {
+        valueSchema = Schema.JSON(Foobar.class);
+        genericSchema = Schema.generic(valueSchema.getSchemaInfo());
+        fooBar = genericSchema.newRecordBuilder()
+                .set("name", "foo")
+                .set("address", "foobar")
+                .set("age", 20)
+                .build();
+        kvSchema = Schema.KeyValue(Schema.STRING, genericSchema, KeyValueEncodingType.SEPARATED);
+    }
+
+    @BeforeMethod
+    public final void setUp() {
+        map = new HashMap<>();
+        map.put("alluxioMasterHost", "localhost");
+        map.put("alluxioMasterPort", "19998");
+        map.put("alluxioDir", "/pulsar");
+        map.put("filePrefix", "prefix");
+        map.put("schemaEnable", "true");
+
+        mockRecord = mock(Record.class);
+        mockSinkContext = mock(SinkContext.class);
+
+        when(mockRecord.getKey()).thenAnswer(new Answer<Optional<String>>() {
+            int count = 0;
+            public Optional<String> answer(InvocationOnMock invocation) throws Throwable {
+                return Optional.of( "key-" + count++);
+            }});
+
+        when(mockRecord.getValue()).thenAnswer((Answer<GenericObject>) invocation -> new GenericObject() {
+            @Override
+            public SchemaType getSchemaType() {
+                return SchemaType.KEY_VALUE;
+            }
+
+            @Override
+            public Object getNativeObject() {
+                return new KeyValue<String, GenericObject>((String) fooBar.getField("address"), fooBar);
+            }
+        });
+
+        when(mockRecord.getSchema()).thenAnswer((Answer<Schema<KeyValue<String, Foobar>>>) invocation -> kvSchema);
+    }
+
+    @Test
+    public void openTest() throws Exception {
+        map.put("filePrefix", "TopicA");
+        map.put("fileExtension", ".txt");
+        map.put("lineSeparator", "\n");
+        map.put("rotationRecords", "100");
+
+        String alluxioDir = "/pulsar";
+
+        LocalAlluxioCluster cluster = setupSingleMasterCluster();
+
+        sink = new AlluxioSink();
+        sink.open(map, mockSinkContext);
+
+        FileSystem client = cluster.getClient();
+
+        AlluxioURI alluxioURI = new AlluxioURI(alluxioDir);
+        Assert.assertTrue(client.exists(alluxioURI));
+
+        String alluxioTmpDir = FilenameUtils.concat(alluxioDir, "tmp");
+        AlluxioURI alluxioTmpURI = new AlluxioURI(alluxioTmpDir);
+        Assert.assertTrue(client.exists(alluxioTmpURI));
+
+        sink.close();
+        cluster.stop();
+    }
+
+    @Test
+    public void writeAndCloseTest() throws Exception {
+        map.put("filePrefix", "TopicA");
+        map.put("fileExtension", ".txt");
+        map.put("lineSeparator", "\n");
+        map.put("rotationRecords", "1");
+        map.put("writeType", "THROUGH");
+        map.put("alluxioDir", "/pulsar");
+
+        String alluxioDir = "/pulsar";
+
+        LocalAlluxioCluster cluster = setupSingleMasterCluster();
+
+        sink = new AlluxioSink();
+        sink.open(map, mockSinkContext);
+
+        sink.write(() -> new GenericObject() {
+            @Override
+            public SchemaType getSchemaType() {
+                return SchemaType.KEY_VALUE;
+            }
+
+            @Override
+            public Object getNativeObject() {
+                return new KeyValue<>((String) fooBar.getField("address"), fooBar);
+            }
+        });
+
+        FileSystem client = cluster.getClient();
+
+        AlluxioURI alluxioURI = new AlluxioURI(alluxioDir);
+        Assert.assertTrue(client.exists(alluxioURI));
+
+        String alluxioTmpDir = FilenameUtils.concat(alluxioDir, "tmp");
+        AlluxioURI alluxioTmpURI = new AlluxioURI(alluxioTmpDir);
+        Assert.assertTrue(client.exists(alluxioTmpURI));
+
+        List<URIStatus> listAlluxioDirStatus = client.listStatus(alluxioURI);
+
+        List<String> pathList = listAlluxioDirStatus.stream().map(URIStatus::getPath).collect(Collectors.toList());
+
+        Assert.assertEquals(pathList.size(), 2);
+
+        for (String path : pathList) {
+            if (path.contains("tmp")) {
+                Assert.assertEquals(path, "/pulsar/tmp");
+            } else {
+                Assert.assertTrue(path.startsWith("/pulsar/TopicA-"));
+            }
+        }
+
+        sink.close();
+        cluster.stop();
+    }
+
+    private LocalAlluxioCluster setupSingleMasterCluster() throws Exception {
+        // Setup and start the local alluxio cluster
+        LocalAlluxioCluster cluster = new LocalAlluxioCluster();
+        cluster.initConfiguration(getTestName(getClass().getSimpleName(), LocalAlluxioCluster.DEFAULT_TEST_NAME));
+        ServerConfiguration.set(PropertyKey.USER_FILE_WRITE_TYPE_DEFAULT, WriteType.MUST_CACHE);
+        cluster.start();
+        return cluster;
+    }
+
+    public String getTestName(String className, String methodName) {
+        String testName = className + "-" + methodName;
+        // cannot use these characters in the name/path: . [ ]
+        testName = testName.replace(".", "-");
+        testName = testName.replace("[", "-");
+        testName = testName.replace("]", "");
+        return testName;
+    }
+}
diff --git a/pulsar-io/alluxio/src/test/java/org/apache/pulsar/io/alluxio/sink/Foobar.java b/pulsar-io/alluxio/src/test/java/org/apache/pulsar/io/alluxio/sink/Foobar.java
new file mode 100644
index 00000000000..0a6c44ca9ce
--- /dev/null
+++ b/pulsar-io/alluxio/src/test/java/org/apache/pulsar/io/alluxio/sink/Foobar.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.alluxio.sink;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class Foobar {
+    String name;
+    String address;
+    int age;
+}
diff --git a/pulsar-io/alluxio/src/test/resources/sinkConfig.yaml b/pulsar-io/alluxio/src/test/resources/sinkConfig.yaml
new file mode 100644
index 00000000000..f7907d4e02f
--- /dev/null
+++ b/pulsar-io/alluxio/src/test/resources/sinkConfig.yaml
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+{
+"alluxioMasterHost": "localhost",
+"alluxioMasterPort": "19998",
+"alluxioDir": "pulsar",
+"filePrefix": "TopicA",
+"fileExtension": ".txt",
+"lineSeparator": "\n",
+"rotationRecords": "100",
+"rotationInterval": "-1"
+}
diff --git a/pulsar-io/pom.xml b/pulsar-io/pom.xml
index a5e096aff59..fb83c6fba19 100644
--- a/pulsar-io/pom.xml
+++ b/pulsar-io/pom.xml
@@ -112,6 +112,7 @@
         <module>influxdb</module>
         <module>dynamodb</module>
         <module>nsq</module>
+        <module>alluxio</module>
       </modules>
     </profile>
 
diff --git a/site2/docs/io-alluxio.md b/site2/docs/io-alluxio.md
new file mode 100644
index 00000000000..59c3247f9e6
--- /dev/null
+++ b/site2/docs/io-alluxio.md
@@ -0,0 +1,62 @@
+---
+id: io-alluxio
+title: Alluxio sink connector
+sidebar_label: Alluxio sink connector
+---
+
+## Sink
+
+The Alluxio sink connector pulls messages from Pulsar topics and persists the messages to an Alluxio directory.
+
+## Configuration
+
+The configuration of the Alluxio sink connector has the following properties.
+
+### Property
+
+| Name | Type|Required | Default | Description 
+|------|----------|----------|---------|-------------|
+| `alluxioMasterHost` | String | true | "" (empty string) | The hostname of Alluxio master. |
+| `alluxioMasterPort` | int | true | 19998 | The port that Alluxio master node runs on. |
+| `alluxioDir` | String | true | "" (empty string) | The Alluxio directory from which files should be read from or written to. |
+| `securityLoginUser` | String | false | "" (empty string) | When `alluxio.security.authentication.type` is set to `SIMPLE` or `CUSTOM`, user application uses this property to indicate the user requesting Alluxio service. If it is not set explicitly, the OS login user is used. |
+| `filePrefix` | String | false | "" (empty string) | The prefix of the files to create in the Alluxio directory (e.g. a value of 'TopicA' results in files named topicA-, topicA-, etc being produced). |
+| `fileExtension` | String | false | "" (empty string) | The extension to add to the files written to Alluxio (e.g. '.txt'). |
+| `lineSeparator` | String | false | "" (empty string) | The character used to separate records in a text file. If no value is provided, then the content from all of the records is concatenated together in one continuous byte array. |
+| `rotationRecords` | long | false | 10000 | The number records of Alluxio file rotation. |
+| `rotationInterval` | long | false | -1 | The interval to rotate a Alluxio file (in milliseconds). |
+| `schemaEnable` | boolean | false | false | Sets whether the Sink has to take into account the Schema or if it should simply copy the raw message to Alluxio. |
+| `writeType` | String | false | `MUST_CACHE` | Default write type when creating Alluxio files. Valid options are `MUST_CACHE` (write only goes to Alluxio and must be stored in Alluxio), `CACHE_THROUGH` (try to cache, write to UnderFS synchronously), `THROUGH` (no cache, write to UnderFS synchronously). |
+
+### Example
+
+Before using the Alluxio sink connector, you need to create a configuration file in the path you will start Pulsar service (i.e. `PULSAR_HOME`) through one of the following methods.
+
+* JSON
+
+    ```json
+    {
+        "alluxioMasterHost": "localhost",
+        "alluxioMasterPort": "19998",
+        "alluxioDir": "pulsar",
+        "filePrefix": "TopicA",
+        "fileExtension": ".txt",
+        "lineSeparator": "\n",
+        "rotationRecords": "100",
+        "rotationInterval": "-1"
+    }
+    ```
+
+* YAML
+
+    ```yaml
+    configs:
+        alluxioMasterHost: "localhost"
+        alluxioMasterPort: "19998"
+        alluxioDir: "pulsar"
+        filePrefix: "TopicA"
+        fileExtension: ".txt"
+        lineSeparator: "\n"
+        rotationRecords: 100
+        rotationInterval: "-1"
+    ```
\ No newline at end of file