You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/01/27 02:15:19 UTC

[GitHub] merlimat closed pull request #659: Pulsar connectors

merlimat closed pull request #659: Pulsar connectors
URL: https://github.com/apache/incubator-pulsar/pull/659
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/all/pom.xml b/all/pom.xml
index c45d67426..d993cd934 100644
--- a/all/pom.xml
+++ b/all/pom.xml
@@ -64,6 +64,18 @@
       <version>${project.version}</version>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.pulsar</groupId>
+      <artifactId>pulsar-connectors-core</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.pulsar</groupId>
+      <artifactId>pulsar-connectors-google-cloud</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
     <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-log4j12</artifactId>
diff --git a/bin/pulsar b/bin/pulsar
index 70fab6a62..ef9a84c3f 100755
--- a/bin/pulsar
+++ b/bin/pulsar
@@ -75,6 +75,7 @@ where command is one of:
     proxy               Run a pulsar proxy
     websocket           Run a web socket proxy server
     standalone          Run a broker server with local bookies and local zookeeper
+    connector           Run a connector
 
     initialize-cluster-metadata     One-time metadata initialization
     zookeeper-shell     Open a ZK shell client
@@ -231,6 +232,8 @@ elif [ $COMMAND == "initialize-cluster-metadata" ]; then
     exec $JAVA $OPTS org.apache.pulsar.PulsarClusterMetadataSetup $@
 elif [ $COMMAND == "zookeeper-shell" ]; then
     exec $JAVA $OPTS org.apache.zookeeper.ZooKeeperMain $@
+elif [ $COMMAND == "connector" ]; then
+    exec $JAVA $OPTS org.apache.pulsar.connect.cli.Connect $@
 elif [ $COMMAND == "help" ]; then
     pulsar_help;
 else
diff --git a/bin/pulsar-daemon b/bin/pulsar-daemon
index 3658659d6..a8e7e4a8b 100755
--- a/bin/pulsar-daemon
+++ b/bin/pulsar-daemon
@@ -82,6 +82,9 @@ case $command in
     (standalone)
         echo "doing $startStop $command ..."
         ;;
+    (connector)
+        echo "doing $startStop $command ..."
+        ;;
     (*)
         echo "Error: unknown service name $command"
         usage
diff --git a/pom.xml b/pom.xml
index 1f98642e4..4d555256f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -89,6 +89,7 @@ flexible messaging model and an intuitive client API.</description>
     <module>pulsar-broker-auth-athenz</module>
     <module>pulsar-client-auth-athenz</module>
     <module>pulsar-zookeeper</module>
+    <module>pulsar-connectors</module>
     <module>all</module>
   </modules>
 
diff --git a/pulsar-connectors/core/pom.xml b/pulsar-connectors/core/pom.xml
new file mode 100644
index 000000000..68661efd6
--- /dev/null
+++ b/pulsar-connectors/core/pom.xml
@@ -0,0 +1,48 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <packaging>jar</packaging>
+
+    <parent>
+        <groupId>org.apache.pulsar</groupId>
+        <artifactId>pulsar-connectors</artifactId>
+        <version>1.19-incubating-SNAPSHOT</version>
+        <relativePath>..</relativePath>
+    </parent>
+
+    <artifactId>pulsar-connectors-core</artifactId>
+    <name>Pulsar Connectors Core</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>pulsar-client</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+
+    </dependencies>
+
+    <build>
+
+    </build>
+</project>
diff --git a/pulsar-connectors/core/src/main/java/org/apache/pulsar/common/io/FileSystem.java b/pulsar-connectors/core/src/main/java/org/apache/pulsar/common/io/FileSystem.java
new file mode 100644
index 000000000..f1bb3330d
--- /dev/null
+++ b/pulsar-connectors/core/src/main/java/org/apache/pulsar/common/io/FileSystem.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.io;
+
+import org.apache.pulsar.common.io.fs.ResourceId;
+
+import java.io.IOException;
+import java.nio.channels.WritableByteChannel;
+
+// code based on apache beam io
+public abstract class FileSystem<ResourceIdT extends ResourceId> {
+
+    protected abstract WritableByteChannel create(
+            ResourceIdT resourceId) throws IOException;
+
+    /**
+     * Returns a new {@link ResourceId} for this filesystem that represents the named resource.
+     * The user supplies both the resource spec and whether it is a directory.
+     *
+     * <p>The supplied {@code singleResourceSpec} is expected to be in a proper format, including
+     * any necessary escaping, for this {@link FileSystem}.
+     *
+     * <p>This function may throw an {@link IllegalArgumentException} if given an invalid argument,
+     * such as when the specified {@code singleResourceSpec} is not a valid resource name.
+     */
+    protected abstract ResourceIdT matchNewResource(String singleResourceSpec, boolean isDirectory);
+
+    /**
+     * Get the URI scheme which defines the namespace of the {@link FileSystem}.
+     *
+     * @see <a href="https://www.ietf.org/rfc/rfc2396.txt">RFC 2396</a>
+     */
+    protected abstract String getScheme();
+}
diff --git a/pulsar-connectors/core/src/main/java/org/apache/pulsar/common/io/FileSystemFactory.java b/pulsar-connectors/core/src/main/java/org/apache/pulsar/common/io/FileSystemFactory.java
new file mode 100644
index 000000000..4390d400b
--- /dev/null
+++ b/pulsar-connectors/core/src/main/java/org/apache/pulsar/common/io/FileSystemFactory.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.io;
+
+import java.util.Properties;
+
+public interface FileSystemFactory {
+
+    FileSystem create(Properties properties);
+}
diff --git a/pulsar-connectors/core/src/main/java/org/apache/pulsar/common/io/FileSystems.java b/pulsar-connectors/core/src/main/java/org/apache/pulsar/common/io/FileSystems.java
new file mode 100644
index 000000000..d0cf63278
--- /dev/null
+++ b/pulsar-connectors/core/src/main/java/org/apache/pulsar/common/io/FileSystems.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.io;
+
+import org.apache.pulsar.common.io.fs.ResourceId;
+import org.apache.pulsar.common.util.ReflectionHelper;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.channels.WritableByteChannel;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+// code based on apache beam io
+public class FileSystems {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FileSystems.class);
+
+    private static final Pattern FILE_SCHEME_PATTERN =
+            Pattern.compile("(?<scheme>[a-zA-Z][-a-zA-Z0-9+.]*):.*");
+
+    private static final AtomicReference<Map<String, FileSystem>> SCHEME_TO_FILESYSTEM =
+            new AtomicReference<>(Collections.unmodifiableMap(new HashMap<>()));
+
+
+    public static WritableByteChannel create(ResourceId resourceId) throws IOException {
+        return getFileSystemInternal(resourceId.getScheme()).create(resourceId);
+    }
+
+    public static ResourceId matchNewResource(String singleResourceSpec, boolean isDirectory) {
+        return getFileSystemInternal(parseScheme(singleResourceSpec))
+                .matchNewResource(singleResourceSpec, isDirectory);
+    }
+
+    public static void register(Properties properties) {
+        final Map<String, FileSystem> filesystems = new HashMap<>();
+        // local
+        FileSystem fs = new LocalFileSystemFactory().create(properties);
+        filesystems.put(fs.getScheme(), fs);
+
+        // load other file systems
+        for (FileSystemFactory factory : getFactories()) {
+            fs = factory.create(properties);
+            if (filesystems.containsKey(fs.getScheme())) {
+                throw new IllegalStateException(String.format(
+                        "Scheme: [%s] has conflicting filesystems: [%s]",
+                        fs.getScheme(),
+                        fs.getClass().getName()));
+            }
+            filesystems.put(fs.getScheme(), fs);
+        }
+
+        SCHEME_TO_FILESYSTEM.set(Collections.unmodifiableMap(filesystems));
+    }
+
+    private static String parseScheme(String spec) {
+        // The spec is almost, but not quite, a URI. In particular,
+        // the reserved characters '[', ']', and '?' have meanings that differ
+        // from their use in the URI spec. ('*' is not reserved).
+        // Here, we just need the scheme, which is so circumscribed as to be
+        // very easy to extract with a regex.
+        final Matcher matcher = FILE_SCHEME_PATTERN.matcher(spec);
+
+        if (!matcher.matches()) {
+            return "file";
+        } else {
+            return matcher.group("scheme").toLowerCase();
+        }
+    }
+
+    private static FileSystem getFileSystemInternal(String scheme) {
+        String lowerCaseScheme = scheme.toLowerCase();
+        final Map<String, FileSystem> schemeToFileSystem = SCHEME_TO_FILESYSTEM.get();
+        FileSystem fileSystem = schemeToFileSystem.get(lowerCaseScheme);
+        if (fileSystem != null) {
+            return fileSystem;
+        }
+
+        throw new IllegalStateException("Unable to find registrar for " + scheme);
+    }
+
+
+    private static List<FileSystemFactory> getFactories() {
+        final List<FileSystemFactory> factories = new ArrayList<>();
+        Iterator<FileSystemFactory> iterator = ServiceLoader.load(FileSystemFactory.class,
+                ReflectionHelper.findClassLoader()).iterator();
+
+        while(iterator.hasNext()) {
+            factories.add(iterator.next());
+        }
+
+        return factories;
+    }
+
+    private FileSystems() {}
+}
diff --git a/pulsar-connectors/core/src/main/java/org/apache/pulsar/common/io/LocalFileSystem.java b/pulsar-connectors/core/src/main/java/org/apache/pulsar/common/io/LocalFileSystem.java
new file mode 100644
index 000000000..597694252
--- /dev/null
+++ b/pulsar-connectors/core/src/main/java/org/apache/pulsar/common/io/LocalFileSystem.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.io;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+// code based on apache beam io
+class LocalFileSystem extends FileSystem<LocalResourceId> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(LocalFileSystem.class);
+
+    LocalFileSystem() {
+    }
+
+    @Override
+    protected WritableByteChannel create(LocalResourceId resourceId)
+            throws IOException {
+        LOG.debug("creating file {}", resourceId);
+        File absoluteFile = resourceId.getPath().toFile().getAbsoluteFile();
+        if (absoluteFile.getParentFile() != null
+                && !absoluteFile.getParentFile().exists()
+                && !absoluteFile.getParentFile().mkdirs()
+                && !absoluteFile.getParentFile().exists()) {
+            throw new IOException("Unable to create parent directories for '" + resourceId + "'");
+        }
+        return Channels.newChannel(
+                new BufferedOutputStream(new FileOutputStream(absoluteFile)));
+    }
+
+    @Override
+    protected LocalResourceId matchNewResource(String singleResourceSpec, boolean isDirectory) {
+        Path path = Paths.get(singleResourceSpec);
+        return LocalResourceId.fromPath(path, isDirectory);
+    }
+
+    @Override
+    protected String getScheme() {
+        return "file";
+    }
+}
diff --git a/pulsar-connectors/core/src/main/java/org/apache/pulsar/common/io/LocalFileSystemFactory.java b/pulsar-connectors/core/src/main/java/org/apache/pulsar/common/io/LocalFileSystemFactory.java
new file mode 100644
index 000000000..b1cb79acf
--- /dev/null
+++ b/pulsar-connectors/core/src/main/java/org/apache/pulsar/common/io/LocalFileSystemFactory.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.io;
+
+import java.util.Properties;
+
+class LocalFileSystemFactory implements FileSystemFactory {
+
+    @Override
+    public FileSystem create(Properties properties) {
+        return new LocalFileSystem();
+    }
+}
diff --git a/pulsar-connectors/core/src/main/java/org/apache/pulsar/common/io/LocalResourceId.java b/pulsar-connectors/core/src/main/java/org/apache/pulsar/common/io/LocalResourceId.java
new file mode 100644
index 000000000..1367c53c8
--- /dev/null
+++ b/pulsar-connectors/core/src/main/java/org/apache/pulsar/common/io/LocalResourceId.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.io;
+
+import org.apache.pulsar.common.io.fs.ResourceId;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Objects;
+
+// code based on apache beam io
+class LocalResourceId implements ResourceId {
+
+    private final String pathString;
+    private transient volatile Path cachedPath;
+    private final boolean isDirectory;
+
+    static LocalResourceId fromPath(Path path, boolean isDirectory) {
+        //checkNotNull(path, "path");
+        return new LocalResourceId(path, isDirectory);
+    }
+
+    private LocalResourceId(Path path, boolean isDirectory) {
+        this.pathString = path.toAbsolutePath().normalize().toString()
+                + (isDirectory ? File.separatorChar : "");
+        this.isDirectory = isDirectory;
+    }
+
+    @Override
+    public String getFilename() {
+        Path fileName = getPath().getFileName();
+        return fileName == null ? null : fileName.toString();
+    }
+
+    @Override
+    public String getScheme() {
+        return "file";
+    }
+
+    @Override
+    public boolean isDirectory() {
+        return isDirectory;
+    }
+
+    Path getPath() {
+        if (cachedPath == null) {
+            cachedPath = Paths.get(pathString);
+        }
+        return cachedPath;
+    }
+
+    @Override
+    public String toString() {
+        return pathString;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (!(obj instanceof LocalResourceId)) {
+            return false;
+        }
+        LocalResourceId other = (LocalResourceId) obj;
+        return this.pathString.equals(other.pathString)
+                && this.isDirectory == other.isDirectory;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(pathString, isDirectory);
+    }
+}
diff --git a/pulsar-connectors/core/src/main/java/org/apache/pulsar/common/io/fs/ResourceId.java b/pulsar-connectors/core/src/main/java/org/apache/pulsar/common/io/fs/ResourceId.java
new file mode 100644
index 000000000..a5e0505e2
--- /dev/null
+++ b/pulsar-connectors/core/src/main/java/org/apache/pulsar/common/io/fs/ResourceId.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.io.fs;
+
+// code based on apache beam io
+public interface ResourceId {
+
+    /**
+     * Get the scheme which defines the namespace of the {@link ResourceId}.
+     *
+     * <p>The scheme is required to follow URI scheme syntax. See
+     * <a href="https://www.ietf.org/rfc/rfc2396.txt">RFC 2396</a>
+     */
+    String getScheme();
+
+
+    /**
+     * Returns the name of the file or directory denoted by this {@code ResourceId}. The file name
+     * is the farthest element from the root in the directory hierarchy.
+     *
+     * @return a string representing the name of file or directory, or null if there are zero
+     * components.
+     */
+    String getFilename();
+
+    /**
+     * Returns {@code true} if this {@link ResourceId} represents a directory, false otherwise.
+     */
+    boolean isDirectory();
+
+    /**
+     * Returns the string representation of this {@link ResourceId}.
+     */
+    String toString();
+}
diff --git a/pulsar-connectors/core/src/main/java/org/apache/pulsar/common/io/util/IoUtils.java b/pulsar-connectors/core/src/main/java/org/apache/pulsar/common/io/util/IoUtils.java
new file mode 100644
index 000000000..4fd77cad0
--- /dev/null
+++ b/pulsar-connectors/core/src/main/java/org/apache/pulsar/common/io/util/IoUtils.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.io.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+public final class IoUtils {
+    private static final Logger LOG = LoggerFactory.getLogger(IoUtils.class);
+
+    public static void closeSilently(Closeable closeable) {
+        if (closeable != null) {
+            try {
+                closeable.close();
+            } catch (Exception e) {
+                LOG.warn("Failed to close {}", closeable, e);
+            }
+        }
+    }
+
+    public static void close(Closeable closeable) throws IOException {
+        if (closeable != null) {
+            closeable.close();
+        }
+    }
+
+    private IoUtils() {}
+}
diff --git a/pulsar-connectors/core/src/main/java/org/apache/pulsar/common/util/ReflectionHelper.java b/pulsar-connectors/core/src/main/java/org/apache/pulsar/common/util/ReflectionHelper.java
new file mode 100644
index 000000000..1a6b73819
--- /dev/null
+++ b/pulsar-connectors/core/src/main/java/org/apache/pulsar/common/util/ReflectionHelper.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util;
+
+public class ReflectionHelper {
+
+    public static ClassLoader findClassLoader() {
+        ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+        if (classLoader == null) {
+            classLoader = ReflectionHelper.class.getClassLoader();
+        }
+        if (classLoader == null) {
+            classLoader = ClassLoader.getSystemClassLoader();
+        }
+        return classLoader;
+    }
+
+    private ReflectionHelper() {}
+}
diff --git a/pulsar-connectors/core/src/main/java/org/apache/pulsar/connect/api/Connector.java b/pulsar-connectors/core/src/main/java/org/apache/pulsar/connect/api/Connector.java
new file mode 100644
index 000000000..33e97acea
--- /dev/null
+++ b/pulsar-connectors/core/src/main/java/org/apache/pulsar/connect/api/Connector.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.connect.api;
+
+import java.util.Properties;
+
+public interface Connector {
+
+    void initialize(Properties properties);
+
+    void close();
+}
diff --git a/pulsar-connectors/core/src/main/java/org/apache/pulsar/connect/api/sink/SinkConnector.java b/pulsar-connectors/core/src/main/java/org/apache/pulsar/connect/api/sink/SinkConnector.java
new file mode 100644
index 000000000..89f19a29f
--- /dev/null
+++ b/pulsar-connectors/core/src/main/java/org/apache/pulsar/connect/api/sink/SinkConnector.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.connect.api.sink;
+
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.connect.api.Connector;
+
+import java.io.IOException;
+
+public abstract class SinkConnector implements Connector {
+
+    // returns true if the we should perform an ack
+    public abstract void processMessage(Message message) throws IOException;
+
+    // called before an ack is sent
+    // consumers should save their state
+    public abstract void commit() throws Exception;
+}
diff --git a/pulsar-connectors/core/src/main/java/org/apache/pulsar/connect/cli/Connect.java b/pulsar-connectors/core/src/main/java/org/apache/pulsar/connect/cli/Connect.java
new file mode 100644
index 000000000..b1d4fd2b9
--- /dev/null
+++ b/pulsar-connectors/core/src/main/java/org/apache/pulsar/connect/cli/Connect.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.connect.cli;
+
+import org.apache.pulsar.connect.runtime.ConnectorExecutionException;
+import org.apache.pulsar.connect.runtime.ConnectorRunner;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+
+public final class Connect {
+    private static final Logger LOG = LoggerFactory.getLogger(Connect.class);
+
+    public static void main(String[] args) {
+        if (args.length != 1) {
+            System.err.println("Must provide a configuration file.");
+            System.exit(1);
+        }
+
+        final Properties properties = new Properties();
+        try {
+            properties.load(new FileInputStream(args[0]));
+        } catch (IOException ioe) {
+            throw new ConnectorExecutionException(ioe);
+        }
+
+        // pretty print the configuration
+        final StringBuilder sb = new StringBuilder();
+        for (Map.Entry<Object, Object> entry: properties.entrySet()) {
+            sb.append(entry.getKey()).append(": ")
+                    .append(entry.getValue()).append("\n");
+        }
+        LOG.info("running connect with properties\n\n{}", sb);
+
+        final ConnectorRunner runner = ConnectorRunner.fromProperties(properties);
+        runner.run();
+    }
+}
diff --git a/pulsar-connectors/core/src/main/java/org/apache/pulsar/connect/config/ConnectorConfiguration.java b/pulsar-connectors/core/src/main/java/org/apache/pulsar/connect/config/ConnectorConfiguration.java
new file mode 100644
index 000000000..d3bc2a4b4
--- /dev/null
+++ b/pulsar-connectors/core/src/main/java/org/apache/pulsar/connect/config/ConnectorConfiguration.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.connect.config;
+
+public class ConnectorConfiguration {
+
+    public static final int DEFAULT_OPERATION_TIMEOUT_SECONDS = 60 * 5; // 5 minutes
+
+    public static final String KEY_SERVICE_URL = "pulsar.url";
+    public static final String DEFAULT_SERVICE_URL = "pulsar://localhost:6650";
+
+    public static final String KEY_CONNECTOR = "connector";
+
+    public static final String KEY_TOPIC = "topic";
+
+    public static final String KEY_SUBSCRIPTION = "subscription";
+
+    public static final String KEY_COMMIT_INTERVAL_MB = "commit.interval.bytes.mb";
+
+    private ConnectorConfiguration() {}
+}
diff --git a/pulsar-connectors/core/src/main/java/org/apache/pulsar/connect/runtime/ConnectorExecutionException.java b/pulsar-connectors/core/src/main/java/org/apache/pulsar/connect/runtime/ConnectorExecutionException.java
new file mode 100644
index 000000000..be0960b7c
--- /dev/null
+++ b/pulsar-connectors/core/src/main/java/org/apache/pulsar/connect/runtime/ConnectorExecutionException.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.connect.runtime;
+
+public class ConnectorExecutionException extends RuntimeException {
+    public ConnectorExecutionException(String message) {
+        super(message);
+    }
+
+    public ConnectorExecutionException(Throwable cause) {
+        super(cause);
+    }
+}
diff --git a/pulsar-connectors/core/src/main/java/org/apache/pulsar/connect/runtime/ConnectorRunner.java b/pulsar-connectors/core/src/main/java/org/apache/pulsar/connect/runtime/ConnectorRunner.java
new file mode 100644
index 000000000..6b3318b3c
--- /dev/null
+++ b/pulsar-connectors/core/src/main/java/org/apache/pulsar/connect/runtime/ConnectorRunner.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.connect.runtime;
+
+import org.apache.pulsar.connect.api.sink.SinkConnector;
+import org.apache.pulsar.connect.config.ConnectorConfiguration;
+import org.apache.pulsar.connect.util.PropertiesValidator;
+
+import java.util.Properties;
+
+public abstract class ConnectorRunner {
+
+    public static ConnectorRunner fromProperties(Properties properties) {
+        PropertiesValidator.validateThrowIfMissingKeys(properties,
+                ConnectorConfiguration.KEY_TOPIC,
+                ConnectorConfiguration.KEY_CONNECTOR);
+
+        try {
+            final String connectorClass = properties.getProperty(ConnectorConfiguration.KEY_CONNECTOR);
+            if (SinkConnector.class.isAssignableFrom(Class.forName(connectorClass))) {
+                return SinkConnectorRunner.fromProperties(properties);
+            }
+
+            throw new RuntimeException("unknown connector class [ " + connectorClass + "]");
+        } catch (ClassNotFoundException cnfe) {
+            throw new ConnectorExecutionException(cnfe);
+        }
+    }
+
+    public abstract void run();
+}
diff --git a/pulsar-connectors/core/src/main/java/org/apache/pulsar/connect/runtime/SinkConnectorRunner.java b/pulsar-connectors/core/src/main/java/org/apache/pulsar/connect/runtime/SinkConnectorRunner.java
new file mode 100644
index 000000000..bbaf55526
--- /dev/null
+++ b/pulsar-connectors/core/src/main/java/org/apache/pulsar/connect/runtime/SinkConnectorRunner.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.connect.runtime;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.io.FileSystems;
+import org.apache.pulsar.connect.api.sink.SinkConnector;
+import org.apache.pulsar.connect.config.ConnectorConfiguration;
+import org.apache.pulsar.connect.util.Bytes;
+import org.apache.pulsar.connect.util.ConfigUtils;
+import org.apache.pulsar.connect.util.InstanceBuilder;
+import org.apache.pulsar.connect.util.PropertiesValidator;
+import org.apache.pulsar.connect.util.PulsarUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+class SinkConnectorRunner extends ConnectorRunner {
+    private static final Logger LOG = LoggerFactory.getLogger(SinkConnectorRunner.class);
+
+    private static final long DEFAULT_ACK_INTERVAL_MB = 5;
+
+    private final SinkConnector connector;
+    private final Properties properties;
+
+    private final long commitIntervalBytesMb;
+
+    private boolean keepGoing = true;
+
+    private SinkConnectorRunner(SinkConnector connector, Properties properties) {
+        this.connector = connector;
+        this.properties = properties;
+        commitIntervalBytesMb =
+                ConfigUtils.getLong(properties,
+                        ConnectorConfiguration.KEY_COMMIT_INTERVAL_MB, DEFAULT_ACK_INTERVAL_MB) * Bytes.MB;
+    }
+
+    @Override
+    public void run() {
+        try (PulsarClient client = createClient(properties)) {
+            //
+            connector.initialize(properties);
+
+            final String topic = getProperty(ConnectorConfiguration.KEY_TOPIC);
+            final String subscription = getProperty(ConnectorConfiguration.KEY_SUBSCRIPTION);
+            final ConsumerConfiguration configuration =
+                    new ConsumerConfiguration().setSubscriptionType(SubscriptionType.Failover);
+
+            // create a subscription and start processing messages
+            try (Consumer consumer = client.subscribe(topic, subscription, configuration)) {
+                LOG.info("Running sink connector {} for topic {} and subscription {}",
+                        connector.getClass().getSimpleName(), topic, subscription);
+                LOG.info("acknowledgement interval {} Mb", Bytes.toMb(commitIntervalBytesMb));
+
+                runSinkConnector(consumer);
+
+            } catch (PulsarClientException pce) {
+                LOG.info("unable to create subscribe to topic {} "
+                        + "with subscription {}", topic, subscription);
+                throw new ConnectorExecutionException(pce);
+            }
+        } catch (PulsarClientException pce) {
+            throw new ConnectorExecutionException(pce);
+        } finally {
+            connector.close();
+        }
+    }
+
+    private void runSinkConnector(Consumer consumer) {
+        long bytesProcessed = 0;
+        final List<MessageId> messageIds = new ArrayList<>();
+        Message currentMessage;
+        while (keepGoing) {
+            try {
+                currentMessage = consumer.receive();
+                messageIds.add(currentMessage.getMessageId());
+
+                connector.processMessage(currentMessage);
+                bytesProcessed += currentMessage.getData().length;
+
+                // TODO add an option for a flush interval
+                // is it time to acknowledge?
+                if (bytesProcessed >= commitIntervalBytesMb) {
+                    connector.commit();
+
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("acknowledging {} messages", messageIds.size());
+                    }
+                    acknowledge(consumer, messageIds);
+                    bytesProcessed = 0;
+
+                    // clear ids since we just acknowledged
+                    messageIds.clear();
+                }
+            } catch (Exception ex) {
+                throw new ConnectorExecutionException(ex);
+            }
+        }
+    }
+
+    private void acknowledge(Consumer consumer, List<MessageId> messageIds)
+            throws PulsarClientException {
+        // TODO wait for these to finish
+        for (MessageId messageId : messageIds) {
+            consumer.acknowledgeAsync(messageId);
+        }
+    }
+
+    private String getProperty(String key) {
+        return properties.getProperty(key);
+    }
+
+    private PulsarClient createClient(Properties properties) throws PulsarClientException {
+        return PulsarUtils.createClient(properties);
+    }
+
+    public static SinkConnectorRunner fromProperties(Properties properties) {
+        PropertiesValidator.validateThrowIfMissingKeys(properties,
+                ConnectorConfiguration.KEY_SUBSCRIPTION);
+
+        FileSystems.register(properties);
+
+        final SinkConnector connector;
+        try {
+            final String sinkConnectorClass =
+                    properties.getProperty(ConnectorConfiguration.KEY_CONNECTOR);
+            connector = InstanceBuilder
+                    .ofType(SinkConnector.class)
+                    .fromClassName(sinkConnectorClass)
+                    .build();
+        } catch (ClassNotFoundException e) {
+            throw new ConnectorExecutionException(e);
+        }
+
+        return new SinkConnectorRunner(connector, properties);
+    }
+}
diff --git a/pulsar-connectors/core/src/main/java/org/apache/pulsar/connect/sink/PrintSinkConnector.java b/pulsar-connectors/core/src/main/java/org/apache/pulsar/connect/sink/PrintSinkConnector.java
new file mode 100644
index 000000000..8af099fd6
--- /dev/null
+++ b/pulsar-connectors/core/src/main/java/org/apache/pulsar/connect/sink/PrintSinkConnector.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.connect.sink;
+
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.connect.api.sink.SinkConnector;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.Properties;
+
+public class PrintSinkConnector extends SinkConnector {
+
+    private static final String OUTPUT_FORMAT = "[%s] %s";
+
+    private PrintStream stream = System.out;
+
+    @Override
+    public void initialize(Properties properties) {
+        stream = System.out;
+    }
+
+    @Override
+    public void close() {
+        stream = null;
+    }
+
+    @Override
+    public void processMessage(Message message) throws IOException {
+        final String output = String.format(OUTPUT_FORMAT,
+                message.getMessageId().toString(),
+                new String(message.getData()));
+        stream.println(output);
+    }
+
+    @Override
+    public void commit() throws Exception {
+        // do nothing
+    }
+}
diff --git a/pulsar-connectors/core/src/main/java/org/apache/pulsar/connect/sink/fs/BaseWriter.java b/pulsar-connectors/core/src/main/java/org/apache/pulsar/connect/sink/fs/BaseWriter.java
new file mode 100644
index 000000000..f580c8734
--- /dev/null
+++ b/pulsar-connectors/core/src/main/java/org/apache/pulsar/connect/sink/fs/BaseWriter.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.connect.sink.fs;
+
+import org.apache.pulsar.common.io.FileSystems;
+import org.apache.pulsar.common.io.fs.ResourceId;
+import org.apache.pulsar.common.io.util.IoUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.channels.Channels;
+
+public abstract class BaseWriter implements Writer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(BaseWriter.class);
+
+    private OutputStream stream;
+
+    @Override
+    public void open(String path) throws IOException {
+        if (isOpen()) {
+            throw new IOException("writer is already open");
+        }
+        final ResourceId resourceId = FileSystems.matchNewResource(path, false);
+        stream = Channels.newOutputStream(FileSystems.create(resourceId));
+    }
+
+    @Override
+    public void close() throws IOException {
+        IoUtils.close(stream);
+        stream = null;
+    }
+
+    @Override
+    public void flush() throws IOException {
+        if (isOpen()) {
+            getStream().flush();
+        }
+    }
+
+    @Override
+    public boolean isOpen() {
+        return stream != null;
+    }
+
+    protected OutputStream getStream() {
+        // TODO check for null stream?
+        return stream;
+    }
+
+    protected void throwIfNotOpen() throws IOException {
+        if (!isOpen()) {
+            throw new IOException("writer has not been opened.");
+        }
+    }
+}
diff --git a/pulsar-connectors/core/src/main/java/org/apache/pulsar/connect/sink/fs/BytesWriter.java b/pulsar-connectors/core/src/main/java/org/apache/pulsar/connect/sink/fs/BytesWriter.java
new file mode 100644
index 000000000..4c7142bd5
--- /dev/null
+++ b/pulsar-connectors/core/src/main/java/org/apache/pulsar/connect/sink/fs/BytesWriter.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.connect.sink.fs;
+
+import org.apache.pulsar.client.api.Message;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+public class BytesWriter extends BaseWriter {
+
+    private final ByteBuffer header = ByteBuffer.allocate(4);
+
+    @Override
+    public void write(Message message) throws IOException {
+        final byte[] data = message.getData();
+        final int size = data.length;
+        header.clear();
+        header.putInt(size);
+
+        final OutputStream stream = getStream();
+        stream.write(header.array());
+        stream.write(message.getData());
+    }
+}
diff --git a/pulsar-connectors/core/src/main/java/org/apache/pulsar/connect/sink/fs/FileSinkConnector.java b/pulsar-connectors/core/src/main/java/org/apache/pulsar/connect/sink/fs/FileSinkConnector.java
new file mode 100644
index 000000000..17d79e70d
--- /dev/null
+++ b/pulsar-connectors/core/src/main/java/org/apache/pulsar/connect/sink/fs/FileSinkConnector.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.connect.sink.fs;
+
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.common.io.util.IoUtils;
+import org.apache.pulsar.connect.api.sink.SinkConnector;
+import org.apache.pulsar.connect.config.ConnectorConfiguration;
+import org.apache.pulsar.connect.util.Bytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Properties;
+
+/**
+ * Write files in the following format /base-path/{date}/output-{time}
+ */
+public class FileSinkConnector extends SinkConnector {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FileSinkConnector.class);
+
+    private static final String KEY_BASE_PATH = "basepath";
+
+    private static final String DEFAULT_OUTPUT_FILE_PREFIX = "output";
+    private static final String DEFAULT_DATE_FORMAT = "yyyyMMdd";
+    private static final String DEFAULT_TIME_FORMAT = "HH-mm-ss";
+
+    private final DateFormat dateFormat = new SimpleDateFormat(DEFAULT_DATE_FORMAT);
+    private final DateFormat timeFormat = new SimpleDateFormat(DEFAULT_TIME_FORMAT);
+
+    private String basePath;
+    private String topic;
+    private String subscription;
+
+    private Writer writer;
+    private long bytesWritten = 0;
+    private String fileUri;
+
+    @Override
+    public void initialize(Properties properties) {
+        topic = properties.getProperty(ConnectorConfiguration.KEY_TOPIC);
+        subscription = properties.getProperty(ConnectorConfiguration.KEY_SUBSCRIPTION);
+        basePath = properties.getProperty(KEY_BASE_PATH);
+    }
+
+    @Override
+    public void processMessage(Message message) throws IOException {
+        Writer writer = getWriterAndOpenIfNecessary();
+
+        writer.write(message);
+
+        bytesWritten += message.getData().length;
+    }
+
+    @Override
+    public void commit() throws Exception {
+        commitAndReset();
+    }
+
+    @Override
+    public void close() {
+        try {
+            commitAndReset();
+        } catch (IOException e) {
+            LOG.warn("failed to commit file when closing", e);
+        }
+    }
+
+    private String createFileUri() {
+        final String base = basePath.endsWith("/") ? basePath : basePath + "/";
+        final Date date = new Date();
+        return base + dateFormat.format(date) + "/" +
+                DEFAULT_OUTPUT_FILE_PREFIX + "-" +
+                timeFormat.format(date);
+    }
+
+    private Writer getWriterAndOpenIfNecessary() throws IOException {
+        if (writer == null) {
+            writer = new BytesWriter();
+            fileUri = createFileUri();
+            LOG.info("opening file {}", fileUri);
+            writer.open(createFileUri());
+        }
+
+        return writer;
+    }
+
+    private void commitAndReset() throws IOException {
+        if (writer != null) {
+            writer.flush();
+            IoUtils.close(writer);
+        }
+        if (fileUri != null) {
+            LOG.info("file {} committed size {} MB", fileUri, Bytes.toMb(bytesWritten));
+        }
+        writer = null;
+        bytesWritten = 0;
+        fileUri = null;
+    }
+}
diff --git a/pulsar-connectors/core/src/main/java/org/apache/pulsar/connect/sink/fs/Writer.java b/pulsar-connectors/core/src/main/java/org/apache/pulsar/connect/sink/fs/Writer.java
new file mode 100644
index 000000000..5d2494bc9
--- /dev/null
+++ b/pulsar-connectors/core/src/main/java/org/apache/pulsar/connect/sink/fs/Writer.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.connect.sink.fs;
+
+import org.apache.pulsar.client.api.Message;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+public interface Writer extends Closeable {
+
+    void open(String path) throws IOException;
+
+    void write(Message message) throws IOException;
+
+    void flush() throws IOException;
+
+    boolean isOpen();
+}
diff --git a/pulsar-connectors/core/src/main/java/org/apache/pulsar/connect/util/Bytes.java b/pulsar-connectors/core/src/main/java/org/apache/pulsar/connect/util/Bytes.java
new file mode 100644
index 000000000..bbb1bdde0
--- /dev/null
+++ b/pulsar-connectors/core/src/main/java/org/apache/pulsar/connect/util/Bytes.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.connect.util;
+
+public class Bytes {
+
+    public static final long MB = 1024L * 1024L;
+
+    public static double toMb(long size) {
+        return (double) size / MB;
+    }
+
+    private Bytes() {}
+}
diff --git a/pulsar-connectors/core/src/main/java/org/apache/pulsar/connect/util/ConfigUtils.java b/pulsar-connectors/core/src/main/java/org/apache/pulsar/connect/util/ConfigUtils.java
new file mode 100644
index 000000000..027ed27d8
--- /dev/null
+++ b/pulsar-connectors/core/src/main/java/org/apache/pulsar/connect/util/ConfigUtils.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.connect.util;
+
+import java.util.Properties;
+
+public class ConfigUtils {
+
+    public static long getLong(Properties properties, String key, long defaultValue) {
+        final String value = properties.getProperty(key);
+        if (value != null) {
+            try {
+                return Long.parseLong(value);
+            } catch (Exception ex) {
+                // ignore
+            }
+        }
+
+        return defaultValue;
+    }
+
+    private ConfigUtils() {}
+}
diff --git a/pulsar-connectors/core/src/main/java/org/apache/pulsar/connect/util/InstanceBuilder.java b/pulsar-connectors/core/src/main/java/org/apache/pulsar/connect/util/InstanceBuilder.java
new file mode 100644
index 000000000..f930fdaca
--- /dev/null
+++ b/pulsar-connectors/core/src/main/java/org/apache/pulsar/connect/util/InstanceBuilder.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.connect.util;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.LinkedList;
+import java.util.List;
+
+// Based off apache beam https://github.com/apache/beam
+// org.apache.beam.sdk.util.InstanceBuilder.java class
+public class InstanceBuilder<T> {
+
+    /**
+     * Create an InstanceBuilder for the given type.
+     *
+     * <p>The specified type is the type returned by {@link #build}, which is
+     * typically the common base type or interface of the instance being
+     * constructed.
+     */
+    public static <T> InstanceBuilder<T> ofType(Class<T> type) {
+        return new InstanceBuilder<>(type);
+    }
+
+    /**
+     * Sets the class name to be constructed.
+     *
+     * <p>If the name is a simple name (ie {@link Class#getSimpleName()}), then
+     * the package of the return type is added as a prefix.
+     *
+     * <p>The default class is the return type, specified in {@link #ofType}.
+     *
+     * <p>Modifies and returns the {@code InstanceBuilder} for chaining.
+     *
+     * @throws ClassNotFoundException if no class can be found by the given name
+     */
+    public InstanceBuilder<T> fromClassName(String name) throws ClassNotFoundException {
+        if (factoryClass != null) {
+            throw new IllegalArgumentException("Class name may only be specified once");
+        }
+
+        if (name.indexOf('.') == -1) {
+            name = type.getPackage().getName() + "." + name;
+        }
+
+        try {
+            factoryClass = Class.forName(name);
+        } catch (ClassNotFoundException e) {
+            throw new ClassNotFoundException(
+                    String.format("Could not find class: %s", name), e);
+        }
+        return this;
+    }
+
+    /**
+     * Sets the factory class to use for instance construction.
+     *
+     * <p>Modifies and returns the {@code InstanceBuilder} for chaining.
+     */
+    public InstanceBuilder<T> fromClass(Class<?> factoryClass) {
+        this.factoryClass = factoryClass;
+        return this;
+    }
+
+    /**
+     * Creates the instance by calling the factory method with the given
+     * arguments.
+     *
+     * <h3>Defaults</h3>
+     * <ul>
+     *   <li>factory class: defaults to the output type class, overridden
+     *   via {@link #fromClassName(String)}.
+     *   <li>factory method: defaults to using a constructor on the factory
+     *   class.
+     * </ul>
+     *
+     * @throws RuntimeException if the method does not exist, on type mismatch,
+     * or if the method cannot be made accessible.
+     */
+    public T build() {
+        if (factoryClass == null) {
+            factoryClass = type;
+        }
+
+        Class<?>[] types = parameterTypes
+                .toArray(new Class<?>[parameterTypes.size()]);
+
+        // TODO: cache results, to speed repeated type lookups?
+        //if (methodName != null) {
+        //    return buildFromMethod(types);
+        //} else {
+        return buildFromConstructor(types);
+        //}
+    }
+
+    /////////////////////////////////////////////////////////////////////////////
+
+    /**
+     * Type of object to construct.
+     */
+    private final Class<T> type;
+
+    /**
+     * Types of parameters for Method lookup.
+     *
+     * @see Class#getDeclaredMethod(String, Class[])
+     */
+    private final List<Class<?>> parameterTypes = new LinkedList<>();
+
+
+    private final List<Object> arguments = new LinkedList<>();
+
+    /**
+     * Name of factory method, or null to invoke the constructor.
+     */
+    private String methodName;
+
+    /**
+     * Factory class, or null to instantiate {@code type}.
+     */
+    private Class<?> factoryClass;
+
+    private InstanceBuilder(Class<T> type) {
+        this.type = type;
+    }
+
+    private T buildFromConstructor(Class<?>[] types) {
+        if (factoryClass == null) {
+            throw new IllegalStateException("factoryClass is null");
+        }
+
+        try {
+            Constructor<?> constructor = factoryClass.getDeclaredConstructor(types);
+
+            if (!type.isAssignableFrom(factoryClass)) {
+                throw new IllegalStateException("Instance type " + factoryClass.getName()
+                        + " must be assignable to " + type.getSimpleName());
+            }
+
+            if (!constructor.isAccessible()) {
+                constructor.setAccessible(true);
+            }
+
+            Object[] args = arguments.toArray(new Object[arguments.size()]);
+            return type.cast(constructor.newInstance(args));
+
+        } catch (NoSuchMethodException e) {
+            throw new RuntimeException("Unable to find constructor for "
+                    + factoryClass.getName());
+
+        } catch (InvocationTargetException
+                | InstantiationException
+                | IllegalAccessException e) {
+            throw new RuntimeException("Failed to construct instance from "
+                    + "constructor " + factoryClass.getName(), e);
+        }
+    }
+}
+
diff --git a/pulsar-connectors/core/src/main/java/org/apache/pulsar/connect/util/PropertiesValidator.java b/pulsar-connectors/core/src/main/java/org/apache/pulsar/connect/util/PropertiesValidator.java
new file mode 100644
index 000000000..0e16946f7
--- /dev/null
+++ b/pulsar-connectors/core/src/main/java/org/apache/pulsar/connect/util/PropertiesValidator.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.connect.util;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+public final class PropertiesValidator {
+
+    public static List<String> validate(Properties properties, String... requiredKeys) {
+        final List<String> missingKeys = new ArrayList<>();
+        for (String key : requiredKeys) {
+            if (!properties.containsKey(key)) {
+                missingKeys.add(key);
+            }
+        }
+
+        return missingKeys;
+    }
+
+    public static void validateThrowIfMissingKeys(Properties properties,
+            String... requiredKeys) {
+        final List<String> missingKeys = validate(properties, requiredKeys);
+        if (!missingKeys.isEmpty()) {
+            final String message = missingKeys.size() > 1 ?
+                    "missing properties: " : "missing property: ";
+            throw new IllegalStateException(message + missingKeys);
+        }
+    }
+
+    private PropertiesValidator() {}
+}
diff --git a/pulsar-connectors/core/src/main/java/org/apache/pulsar/connect/util/PulsarUtils.java b/pulsar-connectors/core/src/main/java/org/apache/pulsar/connect/util/PulsarUtils.java
new file mode 100644
index 000000000..8d43a4f87
--- /dev/null
+++ b/pulsar-connectors/core/src/main/java/org/apache/pulsar/connect/util/PulsarUtils.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.connect.util;
+
+import org.apache.pulsar.client.api.ClientConfiguration;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.connect.config.ConnectorConfiguration;
+
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+public final class PulsarUtils {
+
+    public static PulsarClient createClient(Properties properties) throws PulsarClientException {
+        final ClientConfiguration configuration = new ClientConfiguration();
+        configuration.setOperationTimeout(
+                ConnectorConfiguration.DEFAULT_OPERATION_TIMEOUT_SECONDS,
+                TimeUnit.SECONDS);
+        final String serviceUrl =
+                properties.getProperty(ConnectorConfiguration.KEY_SERVICE_URL,
+                        ConnectorConfiguration.DEFAULT_SERVICE_URL);
+
+        return PulsarClient.create(serviceUrl, configuration);
+    }
+
+    private PulsarUtils() {}
+}
diff --git a/pulsar-connectors/google-cloud/pom.xml b/pulsar-connectors/google-cloud/pom.xml
new file mode 100644
index 000000000..b087a03d4
--- /dev/null
+++ b/pulsar-connectors/google-cloud/pom.xml
@@ -0,0 +1,134 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <packaging>jar</packaging>
+
+    <parent>
+        <groupId>org.apache.pulsar</groupId>
+        <artifactId>pulsar-connectors</artifactId>
+        <version>1.19-incubating-SNAPSHOT</version>
+        <relativePath>..</relativePath>
+    </parent>
+
+    <artifactId>pulsar-connectors-google-cloud</artifactId>
+    <name>Pulsar Connectors Google Cloud</name>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>pulsar-connectors-core</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.google.apis</groupId>
+            <artifactId>google-api-services-storage</artifactId>
+            <version>v1-rev108-1.22.0</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.google.api-client</groupId>
+            <artifactId>google-api-client</artifactId>
+            <version>1.22.0</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.google.cloud.bigdataoss</groupId>
+            <artifactId>gcsio</artifactId>
+            <version>1.6.1</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.google.cloud.bigdataoss</groupId>
+            <artifactId>util</artifactId>
+            <version>1.6.1</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>17.0</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.google.auto.service</groupId>
+            <artifactId>auto-service</artifactId>
+            <version>1.0-rc3</version>
+        </dependency>
+
+    </dependencies>
+
+    <build>
+        <resources>
+            <resource>
+                <directory>src/main/resources</directory>
+                <filtering>true</filtering>
+            </resource>
+        </resources>
+
+        <plugins>
+            <plugin>
+                <!-- Shade all the dependencies to avoid conflicts -->
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <createDependencyReducedPom>false</createDependencyReducedPom>
+                            <promoteTransitiveDependencies>true</promoteTransitiveDependencies>
+
+                            <artifactSet>
+                                <includes>
+                                    <include>com.google.*:*</include>
+                                </includes>
+
+                            </artifactSet>
+
+                            <relocations>
+                                <relocation>
+                                    <pattern>com.google.protobuf</pattern>
+                                    <shadedPattern>org.apache.pulsar.shade.gcs.com.google.protobuf</shadedPattern>
+                                </relocation>
+
+                                <relocation>
+                                    <pattern>com.google.common</pattern>
+                                    <shadedPattern>org.apache.pulsar.shade.gcs.com.google.common</shadedPattern>
+                                </relocation>
+
+                                <relocation>
+                                    <pattern>com.google.thirdparty/</pattern>
+                                    <shadedPattern>org.apache.pulsar.shade.gcs.com.google.thirdparty</shadedPattern>
+                                </relocation>
+                            </relocations>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git a/pulsar-connectors/google-cloud/src/main/java/org/apahce/pulsar/common/io/cloud/gcs/GcsFileSystem.java b/pulsar-connectors/google-cloud/src/main/java/org/apahce/pulsar/common/io/cloud/gcs/GcsFileSystem.java
new file mode 100644
index 000000000..7f0b98cf2
--- /dev/null
+++ b/pulsar-connectors/google-cloud/src/main/java/org/apahce/pulsar/common/io/cloud/gcs/GcsFileSystem.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apahce.pulsar.common.io.cloud.gcs;
+
+import org.apache.pulsar.common.io.FileSystem;
+
+import java.io.IOException;
+import java.nio.channels.WritableByteChannel;
+
+// code based on apache beam io
+public class GcsFileSystem extends FileSystem<GcsResourceId> {
+
+    private final GcsHelper helper;
+
+    GcsFileSystem(GcsHelper helper) {
+        this.helper = helper;
+    }
+
+    @Override
+    protected WritableByteChannel create(GcsResourceId resourceId) throws IOException {
+        // TODO check that bucket exists before writing
+        return helper.create(resourceId.getGcsPath());
+    }
+
+    @Override
+    protected GcsResourceId matchNewResource(String singleResourceSpec, boolean isDirectory) {
+        if (isDirectory) {
+            if (!singleResourceSpec.endsWith("/")) {
+                singleResourceSpec += '/';
+            }
+        } else {
+            if (singleResourceSpec.endsWith("/")) {
+                throw new IllegalStateException(
+                        String.format("Expected a file path, but [%s], ends with '/'. " +
+                                "This is unsupported in GcsFileSystem.", singleResourceSpec)
+                );
+            }
+        }
+        GcsPath path = GcsPath.fromUri(singleResourceSpec);
+        return GcsResourceId.fromGcsPath(path);
+    }
+
+    @Override
+    protected String getScheme() {
+        return "gs";
+    }
+}
diff --git a/pulsar-connectors/google-cloud/src/main/java/org/apahce/pulsar/common/io/cloud/gcs/GcsFileSystemFactory.java b/pulsar-connectors/google-cloud/src/main/java/org/apahce/pulsar/common/io/cloud/gcs/GcsFileSystemFactory.java
new file mode 100644
index 000000000..5739d1a6b
--- /dev/null
+++ b/pulsar-connectors/google-cloud/src/main/java/org/apahce/pulsar/common/io/cloud/gcs/GcsFileSystemFactory.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apahce.pulsar.common.io.cloud.gcs;
+
+import com.google.api.client.auth.oauth2.Credential;
+import com.google.api.client.googleapis.auth.oauth2.GoogleCredential;
+import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;
+import com.google.api.client.http.HttpTransport;
+import com.google.api.client.json.JsonFactory;
+import com.google.api.client.json.jackson2.JacksonFactory;
+import com.google.api.services.storage.Storage;
+import com.google.api.services.storage.StorageScopes;
+import com.google.auto.service.AutoService;
+
+import org.apache.pulsar.common.io.FileSystem;
+import org.apache.pulsar.common.io.FileSystemFactory;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.util.Properties;
+
+@AutoService(FileSystemFactory.class)
+public class GcsFileSystemFactory implements FileSystemFactory {
+
+    private static final String KEY_SERVICE_ACCOUNT_KEY_FILE =
+            "google.cloud.service.account.keyfile";
+
+    @Override
+    public FileSystem create(Properties properties) {
+        final Storage storage;
+        try {
+            storage = createStorage(properties);
+        } catch (IOException | GeneralSecurityException ex) {
+            throw new RuntimeException(ex);
+        }
+
+        return new GcsFileSystem(GcsHelper.create(storage));
+    }
+
+    private Storage createStorage(Properties properties)
+            throws IOException, GeneralSecurityException {
+        final Credential credential = createCredential(properties);
+        final HttpTransport httpTransport = GoogleNetHttpTransport.newTrustedTransport();
+        final JsonFactory jsonFactory = JacksonFactory.getDefaultInstance();
+        return new Storage.Builder(httpTransport, jsonFactory, credential).build();
+    }
+
+    private Credential createCredential(Properties properties) throws IOException {
+        if (properties.containsKey(KEY_SERVICE_ACCOUNT_KEY_FILE)) {
+            final String credentialsPath =
+                    properties.getProperty(KEY_SERVICE_ACCOUNT_KEY_FILE);
+            return GoogleCredential
+                    .fromStream(new FileInputStream(credentialsPath))
+                    .createScoped(StorageScopes.all());
+        }
+
+        return GoogleCredential.getApplicationDefault().createScoped(StorageScopes.all());
+    }
+}
diff --git a/pulsar-connectors/google-cloud/src/main/java/org/apahce/pulsar/common/io/cloud/gcs/GcsHelper.java b/pulsar-connectors/google-cloud/src/main/java/org/apahce/pulsar/common/io/cloud/gcs/GcsHelper.java
new file mode 100644
index 000000000..bfd463806
--- /dev/null
+++ b/pulsar-connectors/google-cloud/src/main/java/org/apahce/pulsar/common/io/cloud/gcs/GcsHelper.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apahce.pulsar.common.io.cloud.gcs;
+
+
+import com.google.api.services.storage.Storage;
+import com.google.cloud.hadoop.gcsio.GoogleCloudStorageWriteChannel;
+import com.google.cloud.hadoop.gcsio.ObjectWriteConditions;
+import com.google.cloud.hadoop.util.AsyncWriteChannelOptions;
+import com.google.cloud.hadoop.util.ClientRequestHelper;
+
+import java.io.IOException;
+import java.nio.channels.WritableByteChannel;
+import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+class GcsHelper {
+
+    private final Storage storageClient;
+
+    private final ExecutorService executorService;
+
+    private GcsHelper(Storage storageClient, ExecutorService executorService) {
+        this.storageClient = storageClient;
+        this.executorService = executorService;
+    }
+
+    WritableByteChannel create(GcsPath path) throws IOException {
+        final GoogleCloudStorageWriteChannel channel = new GoogleCloudStorageWriteChannel(
+                executorService,
+                storageClient,
+                new ClientRequestHelper<>(),
+                path.getBucket(),
+                path.getObject(),
+                AsyncWriteChannelOptions.newBuilder().build(),
+                new ObjectWriteConditions(),
+                Collections.emptyMap());
+
+        channel.initialize();
+
+        return channel;
+    }
+
+    static GcsHelper create(Storage storage) {
+        return new GcsHelper(storage, Executors.newSingleThreadExecutor());
+    }
+}
diff --git a/pulsar-connectors/google-cloud/src/main/java/org/apahce/pulsar/common/io/cloud/gcs/GcsPath.java b/pulsar-connectors/google-cloud/src/main/java/org/apahce/pulsar/common/io/cloud/gcs/GcsPath.java
new file mode 100644
index 000000000..be4967cc0
--- /dev/null
+++ b/pulsar-connectors/google-cloud/src/main/java/org/apahce/pulsar/common/io/cloud/gcs/GcsPath.java
@@ -0,0 +1,627 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apahce.pulsar.common.io.cloud.gcs;
+
+import com.google.api.services.storage.model.StorageObject;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.*;
+import java.util.Iterator;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static com.google.api.client.util.Strings.isNullOrEmpty;
+import static com.google.common.base.Preconditions.checkArgument;
+
+/**
+ * Copied from apache beam project
+ *
+ * Implements the Java NIO {@link Path} API for Google Cloud Storage paths.
+ *
+ * <p>GcsPath uses a slash ('/') as a directory separator.  Below is
+ * a summary of how slashes are treated:
+ * <ul>
+ *   <li> A GCS bucket may not contain a slash.  An object may contain zero or
+ *        more slashes.
+ *   <li> A trailing slash always indicates a directory, which is compliant
+ *        with POSIX.1-2008.
+ *   <li> Slashes separate components of a path.  Empty components are allowed,
+ *        these are represented as repeated slashes.  An empty component always
+ *        refers to a directory, and always ends in a slash.
+ *   <li> {@link #getParent()}} always returns a path ending in a slash, as the
+ *        parent of a GcsPath is always a directory.
+ *   <li> Use {@link #resolve(String)} to append elements to a GcsPath -- this
+ *        applies the rules consistently and is highly recommended over any
+ *        custom string concatenation.
+ * </ul>
+ *
+ * <p>GcsPath treats all GCS objects and buckets as belonging to the same
+ * filesystem, so the root of a GcsPath is the GcsPath bucket="", object="".
+ *
+ * <p>Relative paths are not associated with any bucket.  This matches common
+ * treatment of Path in which relative paths can be constructed from one
+ * filesystem and appended to another filesystem.
+ *
+ * @see <a href=
+ * "http://docs.oracle.com/javase/tutorial/essential/io/pathOps.html"
+ * >Java Tutorials: Path Operations</a>
+ */
+public class GcsPath implements Path, Serializable {
+
+    public static final String SCHEME = "gs";
+
+    /**
+     * Creates a GcsPath from a URI.
+     *
+     * <p>The URI must be in the form {@code gs://[bucket]/[path]}, and may not
+     * contain a port, user info, a query, or a fragment.
+     */
+    public static GcsPath fromUri(URI uri) {
+        checkArgument(uri.getScheme().equalsIgnoreCase(SCHEME), "URI: %s is not a GCS URI", uri);
+        checkArgument(uri.getPort() == -1,
+                "GCS URI may not specify port: %s (%i)", uri, uri.getPort());
+        checkArgument(
+                isNullOrEmpty(uri.getUserInfo()),
+                "GCS URI may not specify userInfo: %s (%s)", uri, uri.getUserInfo());
+        checkArgument(
+                isNullOrEmpty(uri.getQuery()),
+                "GCS URI may not specify query: %s (%s)", uri, uri.getQuery());
+        checkArgument(
+                isNullOrEmpty(uri.getFragment()),
+                "GCS URI may not specify fragment: %s (%s)", uri, uri.getFragment());
+
+        return fromUri(uri.toString());
+    }
+
+    /**
+     * Pattern that is used to parse a GCS URL.
+     *
+     * <p>This is used to separate the components.  Verification is handled
+     * separately.
+     */
+    public static final Pattern GCS_URI =
+            Pattern.compile("(?<SCHEME>[^:]+)://(?<BUCKET>[^/]+)(/(?<OBJECT>.*))?");
+
+    /**
+     * Creates a GcsPath from a URI in string form.
+     *
+     * <p>This does not use URI parsing, which means it may accept patterns that
+     * the URI parser would not accept.
+     */
+    public static GcsPath fromUri(String uri) {
+        Matcher m = GCS_URI.matcher(uri);
+        checkArgument(m.matches(), "Invalid GCS URI: %s", uri);
+
+        checkArgument(m.group("SCHEME").equalsIgnoreCase(SCHEME),
+                "URI: %s is not a GCS URI", uri);
+        return new GcsPath(null, m.group("BUCKET"), m.group("OBJECT"));
+    }
+
+    /**
+     * Pattern that is used to parse a GCS resource name.
+     */
+    private static final Pattern GCS_RESOURCE_NAME =
+            Pattern.compile("storage.googleapis.com/(?<BUCKET>[^/]+)(/(?<OBJECT>.*))?");
+
+    /**
+     * Creates a GcsPath from a OnePlatform resource name in string form.
+     */
+    public static GcsPath fromResourceName(String name) {
+        Matcher m = GCS_RESOURCE_NAME.matcher(name);
+        checkArgument(m.matches(), "Invalid GCS resource name: %s", name);
+
+        return new GcsPath(null, m.group("BUCKET"), m.group("OBJECT"));
+    }
+
+    /**
+     * Creates a GcsPath from a {@linkplain StorageObject}.
+     */
+    public static GcsPath fromObject(StorageObject object) {
+        return new GcsPath(null, object.getBucket(), object.getName());
+    }
+
+    /**
+     * Creates a GcsPath from bucket and object components.
+     *
+     * <p>A GcsPath without a bucket name is treated as a relative path, which
+     * is a path component with no linkage to the root element.  This is similar
+     * to a Unix path that does not begin with the root marker (a slash).
+     * GCS has different naming constraints and APIs for working with buckets and
+     * objects, so these two concepts are kept separate to avoid accidental
+     * attempts to treat objects as buckets, or vice versa, as much as possible.
+     *
+     * <p>A GcsPath without an object name is a bucket reference.
+     * A bucket is always a directory, which could be used to lookup or add
+     * files to a bucket, but could not be opened as a file.
+     *
+     * <p>A GcsPath containing neither bucket or object names is treated as
+     * the root of the GCS filesystem.  A listing on the root element would return
+     * the buckets available to the user.
+     *
+     * <p>If {@code null} is passed as either parameter, it is converted to an
+     * empty string internally for consistency.  There is no distinction between
+     * an empty string and a {@code null}, as neither are allowed by GCS.
+     *
+     * @param bucket a GCS bucket name, or none ({@code null} or an empty string)
+     *               if the object is not associated with a bucket
+     *               (e.g. relative paths or the root node).
+     * @param object a GCS object path, or none ({@code null} or an empty string)
+     *               for no object.
+     */
+    public static GcsPath fromComponents(@Nullable String bucket,
+                                         @Nullable String object) {
+        return new GcsPath(null, bucket, object);
+    }
+
+    @Nullable
+    private transient FileSystem fs;
+    @Nonnull
+    private final String bucket;
+    @Nonnull
+    private final String object;
+
+    /**
+     * Constructs a GcsPath.
+     *
+     * @param fs the associated FileSystem, if any
+     * @param bucket the associated bucket, or none ({@code null} or an empty
+     *               string) for a relative path component
+     * @param object the object, which is a fully-qualified object name if bucket
+     *               was also provided, or none ({@code null} or an empty string)
+     *               for no object
+     * @throws java.lang.IllegalArgumentException if the bucket of object names
+     *         are invalid.
+     */
+    public GcsPath(@Nullable FileSystem fs,
+                   @Nullable String bucket,
+                   @Nullable String object) {
+        if (bucket == null) {
+            bucket = "";
+        }
+        checkArgument(!bucket.contains("/"),
+                "GCS bucket may not contain a slash");
+        checkArgument(bucket.isEmpty()
+                        || bucket.matches("[a-z0-9][-_a-z0-9.]+[a-z0-9]"),
+                "GCS bucket names must contain only lowercase letters, numbers, "
+                        + "dashes (-), underscores (_), and dots (.). Bucket names "
+                        + "must start and end with a number or letter. "
+                        + "See https://developers.google.com/storage/docs/bucketnaming "
+                        + "for more details.  Bucket name: " + bucket);
+
+        if (object == null) {
+            object = "";
+        }
+        checkArgument(
+                object.indexOf('\n') < 0 && object.indexOf('\r') < 0,
+                "GCS object names must not contain Carriage Return or "
+                        + "Line Feed characters.");
+
+        this.fs = fs;
+        this.bucket = bucket;
+        this.object = object;
+    }
+
+    /**
+     * Returns the bucket name associated with this GCS path, or an empty string
+     * if this is a relative path component.
+     */
+    public String getBucket() {
+        return bucket;
+    }
+
+    /**
+     * Returns the object name associated with this GCS path, or an empty string
+     * if no object is specified.
+     */
+    public String getObject() {
+        return object;
+    }
+
+    public void setFileSystem(FileSystem fs) {
+        this.fs = fs;
+    }
+
+    @Override
+    public FileSystem getFileSystem() {
+        return fs;
+    }
+
+    // Absolute paths are those that have a bucket and the root path.
+    @Override
+    public boolean isAbsolute() {
+        return !bucket.isEmpty() || object.isEmpty();
+    }
+
+    @Override
+    public GcsPath getRoot() {
+        return new GcsPath(fs, "", "");
+    }
+
+    @Override
+    public GcsPath getFileName() {
+        int nameCount = getNameCount();
+        if (nameCount < 2) {
+            throw new UnsupportedOperationException(
+                    "Can't get filename from root path in the bucket: " + this);
+        }
+        return getName(nameCount - 1);
+    }
+
+    /**
+     * Returns the <em>parent path</em>, or {@code null} if this path does not
+     * have a parent.
+     *
+     * <p>Returns a path that ends in '/', as the parent path always refers to
+     * a directory.
+     */
+    @Override
+    public GcsPath getParent() {
+        if (bucket.isEmpty() && object.isEmpty()) {
+            // The root path has no parent, by definition.
+            return null;
+        }
+
+        if (object.isEmpty()) {
+            // A GCS bucket. All buckets come from a common root.
+            return getRoot();
+        }
+
+        // Skip last character, in case it is a trailing slash.
+        int i = object.lastIndexOf('/', object.length() - 2);
+        if (i <= 0) {
+            if (bucket.isEmpty()) {
+                // Relative paths are not attached to the root node.
+                return null;
+            }
+            return new GcsPath(fs, bucket, "");
+        }
+
+        // Retain trailing slash.
+        return new GcsPath(fs, bucket, object.substring(0, i + 1));
+    }
+
+    @Override
+    public int getNameCount() {
+        int count = bucket.isEmpty() ? 0 : 1;
+        if (object.isEmpty()) {
+            return count;
+        }
+
+        // Add another for each separator found.
+        int index = -1;
+        while ((index = object.indexOf('/', index + 1)) != -1) {
+            count++;
+        }
+
+        return object.endsWith("/") ? count : count + 1;
+    }
+
+    @Override
+    public GcsPath getName(int count) {
+        checkArgument(count >= 0);
+
+        Iterator<Path> iterator = iterator();
+        for (int i = 0; i < count; ++i) {
+            checkArgument(iterator.hasNext());
+            iterator.next();
+        }
+
+        checkArgument(iterator.hasNext());
+        return (GcsPath) iterator.next();
+    }
+
+    @Override
+    public GcsPath subpath(int beginIndex, int endIndex) {
+        checkArgument(beginIndex >= 0);
+        checkArgument(endIndex > beginIndex);
+
+        Iterator<Path> iterator = iterator();
+        for (int i = 0; i < beginIndex; ++i) {
+            checkArgument(iterator.hasNext());
+            iterator.next();
+        }
+
+        GcsPath path = null;
+        while (beginIndex < endIndex) {
+            checkArgument(iterator.hasNext());
+            if (path == null) {
+                path = (GcsPath) iterator.next();
+            } else {
+                path = path.resolve(iterator.next());
+            }
+            ++beginIndex;
+        }
+
+        return path;
+    }
+
+    @Override
+    public boolean startsWith(Path other) {
+        if (other instanceof GcsPath) {
+            GcsPath gcsPath = (GcsPath) other;
+            return startsWith(gcsPath.bucketAndObject());
+        } else {
+            return startsWith(other.toString());
+        }
+    }
+
+    @Override
+    public boolean startsWith(String prefix) {
+        return bucketAndObject().startsWith(prefix);
+    }
+
+    @Override
+    public boolean endsWith(Path other) {
+        if (other instanceof GcsPath) {
+            GcsPath gcsPath = (GcsPath) other;
+            return endsWith(gcsPath.bucketAndObject());
+        } else {
+            return endsWith(other.toString());
+        }
+    }
+
+    @Override
+    public boolean endsWith(String suffix) {
+        return bucketAndObject().endsWith(suffix);
+    }
+
+    // TODO: support "." and ".." path components?
+    @Override
+    public GcsPath normalize() {
+        return this;
+    }
+
+    @Override
+    public GcsPath resolve(Path other) {
+        if (other instanceof GcsPath) {
+            GcsPath path = (GcsPath) other;
+            if (path.isAbsolute()) {
+                return path;
+            } else {
+                return resolve(path.getObject());
+            }
+        } else {
+            return resolve(other.toString());
+        }
+    }
+
+    @Override
+    public GcsPath resolve(String other) {
+        if (bucket.isEmpty() && object.isEmpty()) {
+            // Resolve on a root path is equivalent to looking up a bucket and object.
+            other = SCHEME + "://" + other;
+        }
+
+        if (other.startsWith(SCHEME + "://")) {
+            GcsPath path = GcsPath.fromUri(other);
+            path.setFileSystem(getFileSystem());
+            return path;
+        }
+
+        if (other.isEmpty()) {
+            // An empty component MUST refer to a directory.
+            other = "/";
+        }
+
+        if (object.isEmpty()) {
+            return new GcsPath(fs, bucket, other);
+        } else if (object.endsWith("/")) {
+            return new GcsPath(fs, bucket, object + other);
+        } else {
+            return new GcsPath(fs, bucket, object + "/" + other);
+        }
+    }
+
+    @Override
+    public Path resolveSibling(Path other) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Path resolveSibling(String other) {
+        if (getNameCount() < 2) {
+            throw new UnsupportedOperationException("Can't resolve the sibling of a root path: " + this);
+        }
+        GcsPath parent = getParent();
+        return (parent == null) ? fromUri(other) : parent.resolve(other);
+    }
+
+    @Override
+    public Path relativize(Path other) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public GcsPath toAbsolutePath() {
+        return this;
+    }
+
+    @Override
+    public GcsPath toRealPath(LinkOption... options) throws IOException {
+        return this;
+    }
+
+    @Override
+    public File toFile() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public WatchKey register(WatchService watcher, WatchEvent.Kind<?>[] events,
+                             WatchEvent.Modifier... modifiers) throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public WatchKey register(WatchService watcher, WatchEvent.Kind<?>... events)
+            throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Iterator<Path> iterator() {
+        return new NameIterator(fs, !bucket.isEmpty(), bucketAndObject());
+    }
+
+    private static class NameIterator implements Iterator<Path> {
+        private final FileSystem fs;
+        private boolean fullPath;
+        private String name;
+
+        NameIterator(FileSystem fs, boolean fullPath, String name) {
+            this.fs = fs;
+            this.fullPath = fullPath;
+            this.name = name;
+        }
+
+        @Override
+        public boolean hasNext() {
+            return !isNullOrEmpty(name);
+        }
+
+        @Override
+        public GcsPath next() {
+            int i = name.indexOf('/');
+            String component;
+            if (i >= 0) {
+                component = name.substring(0, i);
+                name = name.substring(i + 1);
+            } else {
+                component = name;
+                name = null;
+            }
+            if (fullPath) {
+                fullPath = false;
+                return new GcsPath(fs, component, "");
+            } else {
+                // Relative paths have no bucket.
+                return new GcsPath(fs, "", component);
+            }
+        }
+
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+    @Override
+    public int compareTo(Path other) {
+        if (!(other instanceof GcsPath)) {
+            throw new ClassCastException();
+        }
+
+        GcsPath path = (GcsPath) other;
+        int b = bucket.compareTo(path.bucket);
+        if (b != 0) {
+            return b;
+        }
+
+        // Compare a component at a time, so that the separator char doesn't
+        // get compared against component contents.  Eg, "a/b" < "a-1/b".
+        Iterator<Path> left = iterator();
+        Iterator<Path> right = path.iterator();
+
+        while (left.hasNext() && right.hasNext()) {
+            String leftStr = left.next().toString();
+            String rightStr = right.next().toString();
+            int c = leftStr.compareTo(rightStr);
+            if (c != 0) {
+                return c;
+            }
+        }
+
+        if (!left.hasNext() && !right.hasNext()) {
+            return 0;
+        } else {
+            return left.hasNext() ? 1 : -1;
+        }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        GcsPath paths = (GcsPath) o;
+        return bucket.equals(paths.bucket) && object.equals(paths.object);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = bucket.hashCode();
+        result = 31 * result + object.hashCode();
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        if (!isAbsolute()) {
+            return object;
+        }
+        StringBuilder sb = new StringBuilder();
+        sb.append(SCHEME)
+                .append("://");
+        if (!bucket.isEmpty()) {
+            sb.append(bucket)
+                    .append('/');
+        }
+        sb.append(object);
+        return sb.toString();
+    }
+
+    // TODO: Consider using resource names for all GCS paths used by the SDK.
+    public String toResourceName() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("storage.googleapis.com/");
+        if (!bucket.isEmpty()) {
+            sb.append(bucket).append('/');
+        }
+        sb.append(object);
+        return sb.toString();
+    }
+
+    @Override
+    public URI toUri() {
+        try {
+            return new URI(SCHEME, "//" + bucketAndObject(), null);
+        } catch (URISyntaxException e) {
+            throw new RuntimeException("Unable to create URI for GCS path " + this);
+        }
+    }
+
+    private String bucketAndObject() {
+        if (bucket.isEmpty()) {
+            return object;
+        } else {
+            return bucket + "/" + object;
+        }
+    }
+}
+
diff --git a/pulsar-connectors/google-cloud/src/main/java/org/apahce/pulsar/common/io/cloud/gcs/GcsResourceId.java b/pulsar-connectors/google-cloud/src/main/java/org/apahce/pulsar/common/io/cloud/gcs/GcsResourceId.java
new file mode 100644
index 000000000..6d80d121e
--- /dev/null
+++ b/pulsar-connectors/google-cloud/src/main/java/org/apahce/pulsar/common/io/cloud/gcs/GcsResourceId.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apahce.pulsar.common.io.cloud.gcs;
+
+import org.apache.pulsar.common.io.fs.ResourceId;
+
+import static com.google.api.client.util.Preconditions.checkNotNull;
+
+// code based on apache beam io
+class GcsResourceId implements ResourceId {
+
+    private final GcsPath gcsPath;
+
+    static GcsResourceId fromGcsPath(GcsPath gcsPath) {
+        checkNotNull(gcsPath, "gcsPath");
+        return new GcsResourceId(gcsPath);
+    }
+
+    private GcsResourceId(GcsPath gcsPath) {
+        this.gcsPath = gcsPath;
+    }
+
+    @Override
+    public boolean isDirectory() {
+        return gcsPath.endsWith("/");
+    }
+
+    @Override
+    public String getScheme() {
+        return "gs";
+    }
+
+    @Override
+    public String getFilename() {
+        if (gcsPath.getNameCount() <= 1) {
+            return null;
+        } else {
+            GcsPath gcsFilename = gcsPath.getFileName();
+            return gcsFilename == null ? null : gcsFilename.toString();
+        }
+    }
+
+    GcsPath getGcsPath() {
+        return gcsPath;
+    }
+
+    @Override
+    public String toString() {
+        return gcsPath.toString();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (!(obj instanceof GcsResourceId)) {
+            return false;
+        }
+        GcsResourceId other = (GcsResourceId) obj;
+        return this.gcsPath.equals(other.gcsPath);
+    }
+
+    @Override
+    public int hashCode() {
+        return gcsPath.hashCode();
+    }
+}
diff --git a/pulsar-connectors/pom.xml b/pulsar-connectors/pom.xml
new file mode 100644
index 000000000..a12d7e303
--- /dev/null
+++ b/pulsar-connectors/pom.xml
@@ -0,0 +1,40 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <packaging>pom</packaging>
+
+    <parent>
+        <groupId>org.apache.pulsar</groupId>
+        <artifactId>pulsar</artifactId>
+        <version>1.19-incubating-SNAPSHOT</version>
+        <relativePath>..</relativePath>
+    </parent>
+
+    <artifactId>pulsar-connectors</artifactId>
+    <name>Pulsar Connectors Java</name>
+
+    <modules>
+        <module>core</module>
+        <module>google-cloud</module>
+    </modules>
+</project>


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services