You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by dw...@apache.org on 2019/08/09 19:18:48 UTC

[incubator-iceberg] branch master updated: Moving/Renameing hadoop module to filesystem (#277)

This is an automated email from the ASF dual-hosted git repository.

dweeks pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new b5446e4  Moving/Renameing hadoop module to filesystem (#277)
b5446e4 is described below

commit b5446e4f1dc8ac598db9d1fcae58d8b7369ee071
Author: TGooch44 <te...@gmail.com>
AuthorDate: Fri Aug 9 12:18:43 2019 -0700

    Moving/Renameing hadoop module to filesystem (#277)
---
 python/iceberg/core/avro/avro_schema_util.py       |  14 +-
 .../core/base_metastore_table_operations.py        |  43 +++-
 python/iceberg/core/data_files.py                  |   8 +-
 .../core/{hadoop => filesystem}/__init__.py        |  10 +-
 .../core/{hadoop => filesystem}/file_status.py     |   0
 .../file_system.py}                                |  69 +++++-
 .../core/filesystem/filesystem_table_operations.py | 140 +++++++++++
 .../iceberg/core/filesystem/filesystem_tables.py   |  53 +++++
 .../{hadoop => filesystem}/local_filesystem.py     |  65 +++---
 python/iceberg/core/filesystem/s3_filesystem.py    | 257 +++++++++++++++++++++
 python/iceberg/core/{hadoop => filesystem}/util.py |  28 +--
 python/iceberg/core/hadoop/file_system.py          |  25 --
 python/iceberg/core/hadoop/hadoop_output_file.py   |  49 ----
 .../iceberg/core/hadoop/hadoop_table_operations.py |  49 ----
 .../iceberg/core/hadoop/s3_filesystem_wrapper.py   |  54 -----
 python/iceberg/core/table_properties.py            |  77 ++++++
 python/iceberg/core/util/__init__.py               |   7 +-
 python/iceberg/core/util/bin_packing.py            |   1 +
 python/iceberg/spark/__init__.py                   |  19 --
 python/iceberg/spark/source/__init__.py            |  19 --
 python/iceberg/spark/source/spark_catalog.py       |  53 -----
 python/iceberg/spark/table_identifier.py           |  21 --
 python/setup.py                                    |   7 +-
 python/tests/core/test_table_metadata_parser.py    |   7 +-
 python/tox.ini                                     |   2 +-
 25 files changed, 696 insertions(+), 381 deletions(-)

diff --git a/python/iceberg/core/avro/avro_schema_util.py b/python/iceberg/core/avro/avro_schema_util.py
index efe7299..5d47890 100644
--- a/python/iceberg/core/avro/avro_schema_util.py
+++ b/python/iceberg/core/avro/avro_schema_util.py
@@ -6,14 +6,14 @@
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
 #
-# http://www.apache.org/licenses/LICENSE-2.0
+#   http://www.apache.org/licenses/LICENSE-2.0
 #
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
 
 class AvroSchemaUtil(object):
     FIELD_ID_PROP = "field-id"
diff --git a/python/iceberg/core/base_metastore_table_operations.py b/python/iceberg/core/base_metastore_table_operations.py
index b536110..a50f342 100644
--- a/python/iceberg/core/base_metastore_table_operations.py
+++ b/python/iceberg/core/base_metastore_table_operations.py
@@ -18,13 +18,11 @@
 import logging
 import uuid
 
-from iceberg.core.hadoop import (get_fs,
-                                 HadoopInputFile,
-                                 HadoopOutputFile)
 from retrying import retry
 
 from .table_metadata_parser import TableMetadataParser
 from .table_operations import TableOperations
+from .table_properties import TableProperties
 
 _logger = logging.getLogger(__name__)
 
@@ -46,6 +44,7 @@ class BaseMetastoreTableOperations(TableOperations):
         self.current_metadata = None
         self.current_metadata_location = None
         self.base_location = None
+        self.should_refresh = True
         self.version = -1
 
     def current(self):
@@ -59,13 +58,18 @@ class BaseMetastoreTableOperations(TableOperations):
         return "{base_location}/{data}".format(base_location=self.base_location,
                                                data=BaseMetastoreTableOperations.DATA_FOLDER_NAME)
 
+    def request_refresh(self):
+        self.should_refresh = True
+
     def write_new_metadata(self, metadata, version):
+        from .filesystem import FileSystemOutputFile
+
         if self.base_location is None:
             self.base_location = metadata.location
 
         new_filename = BaseMetastoreTableOperations.new_table_metadata_filename(self.base_location,
                                                                                 version)
-        new_metadata_location = HadoopOutputFile.from_path(new_filename, self.conf)
+        new_metadata_location = FileSystemOutputFile.from_path(new_filename, self.conf)
 
         TableMetadataParser.write(metadata, new_metadata_location)
         return new_filename
@@ -73,24 +77,43 @@ class BaseMetastoreTableOperations(TableOperations):
     def refresh_from_metadata_location(self, new_location, num_retries=20):
         if not self.current_metadata_location == new_location:
             _logger.info("Refreshing table metadata from new version: %s" % new_location)
+            self.retryable_refresh(new_location)
 
-        self.retryable_refresh(new_location)
+        self.should_refresh = False
 
     def new_input_file(self, path):
-        return HadoopInputFile.from_location(path, self.conf)
+        from .filesystem import FileSystemInputFile
+
+        return FileSystemInputFile.from_location(path, self.conf)
 
     def new_metadata_file(self, filename):
-        return HadoopOutputFile.from_path(BaseMetastoreTableOperations.new_metadata_location(self.base_location,
-                                                                                             filename),
-                                          self.conf)
+        from .filesystem import FileSystemOutputFile
+
+        return FileSystemOutputFile.from_path(BaseMetastoreTableOperations.new_metadata_location(self.base_location,
+                                                                                                 filename),
+                                              self.conf)
+
+    def metadata_file_location(self, file_name, metadata=None):
+        if metadata is None:
+            return self.metadata_file_location(file_name, metadata=self.current())
+
+        metadata_location = metadata.properties.get(TableProperties.WRITE_METADATA_LOCATION)
+
+        if metadata_location is not None:
+            return "{}/{}".format(metadata_location, file_name)
+        else:
+            return "{}/{}/{}".format(metadata.location, BaseMetastoreTableOperations.METADATA_FOLDER_NAME, file_name)
 
     def delete_file(self, path):
+        from .filesystem import get_fs
         get_fs(path, self.conf).delete(path, False)
 
     @retry(wait_incrementing_start=100, wait_exponential_multiplier=4,
            wait_exponential_max=5000, stop_max_delay=600000, stop_max_attempt_number=2)
     def retryable_refresh(self, location):
-        self.current_metadata = TableMetadataParser.read(self, HadoopInputFile.from_location(location, self.conf))
+        from .filesystem import FileSystemInputFile
+
+        self.current_metadata = TableMetadataParser.read(self, FileSystemInputFile.from_location(location, self.conf))
         self.current_metadata_location = location
         self.base_location = self.current_metadata.location
         self.version = BaseMetastoreTableOperations.parse_version(location)
diff --git a/python/iceberg/core/data_files.py b/python/iceberg/core/data_files.py
index 34d4422..0a50e5b 100644
--- a/python/iceberg/core/data_files.py
+++ b/python/iceberg/core/data_files.py
@@ -20,8 +20,8 @@ from iceberg.api import (FileFormat,
                          Metrics)
 from iceberg.api.types import Conversions
 
+from .filesystem import FileSystemInputFile
 from .generic_data_file import GenericDataFile
-from .hadoop import HadoopInputFile
 from .partition_data import PartitionData
 
 
@@ -78,7 +78,7 @@ class DataFiles(object):
 
     @staticmethod
     def from_input_file(input_file, row_count, partition_data=None, metrics=None):
-        if isinstance(input_file, HadoopInputFile):
+        if isinstance(input_file, FileSystemInputFile):
             return DataFiles.from_stat(input_file.get_stat(), row_count,
                                        partition_data=partition_data, metrics=metrics)
 
@@ -124,6 +124,7 @@ class DataFileBuilder(object):
         self.null_value_counts = None
         self.lower_bounds = None
         self.upper_bounds = None
+        return self
 
     def copy(self, to_copy):
         if self.is_partitioned:
@@ -148,7 +149,7 @@ class DataFileBuilder(object):
         return self
 
     def with_input_file(self, input_file):
-        if isinstance(input_file, HadoopInputFile):
+        if isinstance(input_file, FileSystemInputFile):
             self.with_status(input_file.get_stat())
 
         self.file_path = self.location()
@@ -158,6 +159,7 @@ class DataFileBuilder(object):
 
     def with_path(self, path):
         self.file_path = path
+        return self
 
     def with_format(self, fmt):
         if isinstance(fmt, FileFormat):
diff --git a/python/iceberg/core/hadoop/__init__.py b/python/iceberg/core/filesystem/__init__.py
similarity index 63%
rename from python/iceberg/core/hadoop/__init__.py
rename to python/iceberg/core/filesystem/__init__.py
index 2ac455f..b836e2f 100644
--- a/python/iceberg/core/hadoop/__init__.py
+++ b/python/iceberg/core/filesystem/__init__.py
@@ -15,8 +15,12 @@
 # specific language governing permissions and limitations
 # under the License.
 
-__all__ = ["get_fs", "HadoopInputFile", "HadoopOutputFile"]
+__all__ = ["get_fs", "FileStatus", "FileSystem", "FileSystemInputFile", "FileSystemOutputFile",
+           "FilesystemTableOperations", "FilesystemTables", "S3File", "S3FileSystem"]
 
-from .hadoop_input_file import HadoopInputFile
-from .hadoop_output_file import HadoopOutputFile
+from .file_status import FileStatus
+from .file_system import FileSystem, FileSystemInputFile, FileSystemOutputFile
+from .filesystem_table_operations import FilesystemTableOperations
+from .filesystem_tables import FilesystemTables
+from .s3_filesystem import S3File, S3FileSystem
 from .util import get_fs
diff --git a/python/iceberg/core/hadoop/file_status.py b/python/iceberg/core/filesystem/file_status.py
similarity index 100%
rename from python/iceberg/core/hadoop/file_status.py
rename to python/iceberg/core/filesystem/file_status.py
diff --git a/python/iceberg/core/hadoop/hadoop_input_file.py b/python/iceberg/core/filesystem/file_system.py
similarity index 50%
rename from python/iceberg/core/hadoop/hadoop_input_file.py
rename to python/iceberg/core/filesystem/file_system.py
index 66049dd..1ed2029 100644
--- a/python/iceberg/core/hadoop/hadoop_input_file.py
+++ b/python/iceberg/core/filesystem/file_system.py
@@ -17,12 +17,33 @@
 
 import gzip
 
-from iceberg.api.io import InputFile
+from iceberg.api.io import InputFile, OutputFile
 
 from .util import get_fs
 
 
-class HadoopInputFile(InputFile):
+class FileSystem(object):
+
+    def open(self, path, mode='rb'):
+        raise NotImplementedError()
+
+    def create(self, path, overwrite=False):
+        raise NotImplementedError()
+
+    def exists(self, path):
+        raise NotImplementedError()
+
+    def delete(self, path):
+        raise NotImplementedError()
+
+    def stat(self, path):
+        raise NotImplementedError()
+
+    def rename(self, src, dest):
+        raise NotImplementedError()
+
+
+class FileSystemInputFile(InputFile):
 
     def __init__(self, fs, path, conf, length=None, stat=None):
         self.fs = fs
@@ -34,7 +55,7 @@ class HadoopInputFile(InputFile):
     @staticmethod
     def from_location(location, conf):
         fs = get_fs(location, conf)
-        return HadoopInputFile(fs, location, conf)
+        return FileSystemInputFile(fs, location, conf)
 
     def location(self):
         return self.path
@@ -57,5 +78,43 @@ class HadoopInputFile(InputFile):
             for line in fo:
                 yield line
 
-    def new_fo(self):
-        return self.fs.open(self.location())
+    def new_fo(self, mode="rb"):
+        return self.fs.open(self.location(), mode=mode)
+
+    def __repr__(self):
+        return "FileSystemInputFile({})".format(self.path)
+
+    def __str__(self):
+        return self.__repr__()
+
+
+class FileSystemOutputFile(OutputFile):
+
+    @staticmethod
+    def from_path(path, conf):
+        return FileSystemOutputFile(path, conf)
+
+    def __init__(self, path, conf):
+        self.path = path
+        self.conf = conf
+
+    def create(self, mode="w"):
+        fs = get_fs(self.path, self.conf)
+        if fs.exists(self.path):
+            raise RuntimeError("File %s already exists" % self.path)
+
+        return fs.open(self.path, mode=mode)
+
+    def create_or_overwrite(self):
+        fs = get_fs(self.path, self.conf)
+
+        return fs.open(self.path, "wb")
+
+    def location(self):
+        return str(self.path)
+
+    def __repr__(self):
+        return "FileSystemOutputFile({})".format(self.path)
+
+    def __str__(self):
+        return self.__repr__()
diff --git a/python/iceberg/core/filesystem/filesystem_table_operations.py b/python/iceberg/core/filesystem/filesystem_table_operations.py
new file mode 100644
index 0000000..1215654
--- /dev/null
+++ b/python/iceberg/core/filesystem/filesystem_table_operations.py
@@ -0,0 +1,140 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import logging
+from pathlib import Path
+import uuid
+
+from iceberg.exceptions import CommitFailedException, ValidationException
+
+from .file_system import FileSystemInputFile, FileSystemOutputFile
+from .util import get_fs
+from ..table_metadata_parser import TableMetadataParser
+from ..table_operations import TableOperations
+from ..table_properties import TableProperties
+
+_logger = logging.getLogger(__name__)
+
+
+class FilesystemTableOperations(TableOperations):
+
+    def __init__(self, location, conf):
+        self.conf = conf
+        self.location = Path(location)
+        self.should_refresh = True
+        self.version = None
+        self.current_metadata = None
+
+    def current(self):
+        if self.should_refresh:
+            return self.refresh()
+
+        return self.current_metadata
+
+    def refresh(self):
+        ver = self.version if self.version is not None else self.read_version_hint()
+        metadata_file = self.metadata_file(ver)
+        fs = get_fs(str(metadata_file), self.conf)
+
+        if ver is not None and not fs.exists(metadata_file):
+            if ver == 0:
+                return None
+            raise ValidationException("Metadata file is missing: %s" % metadata_file)
+
+        while fs.exists(str(self.metadata_file(ver + 1))):
+            ver += 1
+            metadata_file = self.metadata_file(ver)
+
+        self.version = ver
+        self.current_metadata = TableMetadataParser.read(self, FileSystemInputFile.from_location(str(metadata_file),
+                                                                                                 self.conf))
+        self.should_refresh = False
+        return self.current_metadata
+
+    def commit(self, base, metadata):
+        if base != self.current():
+            raise CommitFailedException("Cannot commit changes based on stale table metadata")
+
+        if not (base is None or base.location() == metadata.location()):
+            raise RuntimeError("Hadoop path-based tables cannot be relocated")
+        if TableProperties.WRITE_METADATA_LOCATION in metadata.properties:
+            raise RuntimeError("Hadoop path-based tables cannot be relocated")
+
+        temp_metadata_file = self.metadata_path("{}{}".format(uuid.uuid4(),
+                                                              TableMetadataParser.get_file_extension(self.conf)))
+        TableMetadataParser.write(metadata, FileSystemOutputFile.from_path(str(temp_metadata_file), self.conf))
+
+        next_version = (self.version if self.version is not None else 0) + 1
+        final_metadata_file = self.metadata_file(next_version)
+        fs = get_fs(str(final_metadata_file), self.conf)
+
+        if fs.exists(final_metadata_file):
+            raise CommitFailedException("Version %s already exists: %s" % (next_version, final_metadata_file))
+
+        if not fs.rename(temp_metadata_file, final_metadata_file):
+            raise CommitFailedException("Failed to commit changes using rename: %s" % final_metadata_file)
+
+        self.write_version_hint(next_version)
+        self.should_refresh = True
+
+    def new_input_file(self, path):
+        return FileSystemInputFile.from_location(path, self.conf)
+
+    def new_output_file(self, path):
+        return FileSystemOutputFile.from_path(path, self.conf)
+
+    def new_metadata_file(self, filename):
+        raise RuntimeError("Not yet implemented")
+
+    def delete_file(self, path):
+        get_fs(path, self.conf).delete(path)
+
+    def new_snapshot_id(self):
+        raise RuntimeError("Not yet implemented")
+
+    def metadata_file_location(self, file):
+        return str(self.metadata_path(file))
+
+    def metadata_file(self, version):
+        return self.metadata_path("v{}{}".format(version, TableMetadataParser.get_file_extension(self.conf)))
+
+    def metadata_path(self, filename):
+        return self.location / Path("metadata") / Path(filename)
+
+    def version_hint_file(self):
+        return self.metadata_path("version-hint.text")
+
+    def read_version_hint(self):
+        version_hint_file = str(self.version_hint_file())
+        fs = get_fs(version_hint_file, self.conf)
+
+        if not fs.exists(version_hint_file):
+            return 0
+        else:
+            with fs.open(version_hint_file, "r") as fo:
+                return int(fo.readline().replace("\n", ""))
+
+    def write_version_hint(self, version):
+        version_hint_file = str(self.version_hint_file())
+        fs = get_fs(version_hint_file, self.conf)
+        try:
+
+            with fs.create(version_hint_file, True) as fo:
+                fo.write("{}".format(version))
+
+        except RuntimeError as e:
+            _logger.warning("Unable to update version hint: %s" % e)
diff --git a/python/iceberg/core/filesystem/filesystem_tables.py b/python/iceberg/core/filesystem/filesystem_tables.py
new file mode 100644
index 0000000..77bd751
--- /dev/null
+++ b/python/iceberg/core/filesystem/filesystem_tables.py
@@ -0,0 +1,53 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+from iceberg.api import Tables
+from iceberg.exceptions import NoSuchTableException
+
+from .filesystem_table_operations import FilesystemTableOperations
+from ..table_metadata import TableMetadata
+
+
+class FilesystemTables(Tables):
+
+    def __init__(self, conf=None):
+        self.conf = conf if conf is not None else dict()
+
+    def load(self, location):
+        from ..base_table import BaseTable
+        ops = self.new_table_ops(location)
+        if ops.current() is None:
+            raise NoSuchTableException("Table does not exist at location: %s" % location)
+
+        return BaseTable(ops, location)
+
+    def create(self, schema, table_identifier=None, spec=None, properties=None, location=None):
+        from ..base_table import BaseTable
+        spec, properties = super(FilesystemTables, self).default_args(spec, properties)
+        ops = self.new_table_ops(location)
+
+        metadata = TableMetadata.new_table_metadata(ops, schema, spec, location, properties)
+        ops.commit(None, metadata)
+
+        return BaseTable(ops, location)
+
+    def new_table_ops(self, location):
+        if location is None:
+            raise RuntimeError("location cannot be None")
+
+        return FilesystemTableOperations(location, self.conf)
diff --git a/python/iceberg/core/hadoop/local_filesystem.py b/python/iceberg/core/filesystem/local_filesystem.py
similarity index 63%
rename from python/iceberg/core/hadoop/local_filesystem.py
rename to python/iceberg/core/filesystem/local_filesystem.py
index aa55de9..826700a 100644
--- a/python/iceberg/core/hadoop/local_filesystem.py
+++ b/python/iceberg/core/filesystem/local_filesystem.py
@@ -15,8 +15,9 @@
 # specific language governing permissions and limitations
 # under the License.
 
+import errno
 import os
-import stat
+from pathlib import Path
 
 from .file_status import FileStatus
 from .file_system import FileSystem
@@ -36,12 +37,23 @@ class LocalFileSystem(FileSystem):
             LocalFileSystem.fs_inst = self
 
     def open(self, path, mode='rb'):
+        open_path = Path(LocalFileSystem.fix_path(path))
 
-        return open(LocalFileSystem.fix_path(path), mode=mode)
+        if "w" in mode and not open_path.parents[0].exists():
+            try:
+                open_path.parents[0].mkdir(parents=True)
+            except OSError as exc:
+                if exc.errno != errno.EEXIST:
+                    raise
+
+        return open(open_path, mode=mode)
+
+    def delete(self, path):
+        raise NotImplementedError()
 
     def stat(self, path):
         st = os.stat(LocalFileSystem.fix_path(path))
-        return FileStatus(path=path, length=st.st_size, is_dir=stat.S_ISDIR(st.st_mode),
+        return FileStatus(path=path, length=st.st_size, is_dir=os.stat.S_ISDIR(st.st_mode),
                           blocksize=st.st_blksize, modification_time=st.st_mtime, access_time=st.st_atime,
                           permission=st.st_mode, owner=st.st_uid, group=st.st_gid)
 
@@ -51,34 +63,19 @@ class LocalFileSystem(FileSystem):
             path = str(path[7:])
         return path
 
-    # def ls(self, path):
-    #     return os.listdir(path)
-    #
-    # def delete(self, path, recursive=False):
-    #     return os.remove(path)
-    #
-
-    #
-    # def rename(self, path, new_path):
-    #     return os.rename(path, new_path)
-    #
-    # def mkdir(self, path, create_parents=True):
-    #     if create_parents:
-    #         return os.makedirs(path)
-    #
-    #     return os.mkdir(path)
-    #
-    # def exists(self, path):
-    #     return os.path.isfile(path)
-    #
-    # def isdir(self, path):
-    #     return os.path.isdir(path)
-    #
-    # def isfile(self, path):
-    #     return os.path.isfile(path)
-    #
-    # def _isfilestore(self):
-    #     return True
-    #
-    # def open(self, path, mode='rb'):
-    #     return open(path, mode=mode)
+    def create(self, path, overwrite=False):
+        if os.path.exists(path) and not overwrite:
+            raise RuntimeError("Path %s already exists" % path)
+
+        return open(path, "w")
+
+    def rename(self, src, dest):
+        try:
+            os.rename(src, dest)
+        except OSError:
+            return False
+
+        return True
+
+    def exists(self, path):
+        return os.path.exists(path)
diff --git a/python/iceberg/core/filesystem/s3_filesystem.py b/python/iceberg/core/filesystem/s3_filesystem.py
new file mode 100644
index 0000000..9423ee0
--- /dev/null
+++ b/python/iceberg/core/filesystem/s3_filesystem.py
@@ -0,0 +1,257 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import io
+import logging
+import re
+import time
+from urllib.parse import urlparse
+
+import boto3
+from botocore.credentials import RefreshableCredentials
+from botocore.exceptions import ClientError
+from botocore.session import get_session
+from retrying import retry
+
+from .file_status import FileStatus
+from .file_system import FileSystem
+
+_logger = logging.getLogger(__name__)
+
+
+S3_CLIENT = dict()
+BOTO_STS_CLIENT = boto3.client('sts')
+CONF = None
+ROLE_ARN = "default"
+AUTOREFRESH_SESSION = None
+
+
+
+@retry(wait_incrementing_start=100, wait_exponential_multiplier=4,
+       wait_exponential_max=5000, stop_max_delay=600000, stop_max_attempt_number=7)
+def get_s3(obj="resource"):
+    global AUTOREFRESH_SESSION
+    global ROLE_ARN
+    if ROLE_ARN not in S3_CLIENT:
+        S3_CLIENT[ROLE_ARN] = dict()
+
+    if ROLE_ARN == "default":
+        if AUTOREFRESH_SESSION is None:
+            AUTOREFRESH_SESSION = boto3.Session()
+        S3_CLIENT["default"]["resource"] = AUTOREFRESH_SESSION.resource('s3')
+        S3_CLIENT["default"]["client"] = AUTOREFRESH_SESSION.client('s3')
+    else:
+        if AUTOREFRESH_SESSION is None:
+            sess = get_session()
+            sess._credentials = RefreshableCredentials.create_from_metadata(metadata=refresh_sts_session_keys(),
+                                                                            refresh_using=refresh_sts_session_keys,
+                                                                            method="sts-assume-role")
+            AUTOREFRESH_SESSION = boto3.Session(botocore_session=sess)
+
+        S3_CLIENT[ROLE_ARN]["resource"] = AUTOREFRESH_SESSION.resource("s3")
+        S3_CLIENT[ROLE_ARN]["client"] = AUTOREFRESH_SESSION.client("s3")
+
+    return S3_CLIENT.get(ROLE_ARN, dict()).get(obj)
+
+
+def refresh_sts_session_keys():
+    params = {"RoleArn": ROLE_ARN,
+              "RoleSessionName": "iceberg_python_client_{}".format(int(time.time() * 1000.00))}
+
+    sts_creds = BOTO_STS_CLIENT.assume_role(**params).get("Credentials")
+    credentials = {"access_key": sts_creds.get("AccessKeyId"),
+                   "secret_key": sts_creds.get("SecretAccessKey"),
+                   "token": sts_creds.get("SessionToken"),
+                   "expiry_time": sts_creds.get("Expiration").isoformat()}
+    return credentials
+
+
+def url_to_bucket_key_name_tuple(url):
+    parsed_url = urlparse(url)
+    return parsed_url.netloc, parsed_url.path[1:], parsed_url.path.split("/")[-1]
+
+
+class S3FileSystem(FileSystem):
+    fs_inst = None
+
+    @staticmethod
+    def get_instance():
+        if S3FileSystem.fs_inst is None:
+            S3FileSystem()
+        return S3FileSystem.fs_inst
+
+    def __init__(self):
+        if S3FileSystem.fs_inst is None:
+            S3FileSystem.fs_inst = self
+
+    def set_conf(self, conf):
+        global CONF
+
+        CONF = conf
+        self.set_role(CONF.get("hive.aws_iam_role", "default"))
+
+    def set_role(self, role):
+        global ROLE_ARN
+
+        if role is not None:
+            ROLE_ARN = role
+
+    def exists(self, path):
+        try:
+            self.info(path)
+        except ClientError as ce:
+            if ce.response['Error']['Code'] == "404":
+                return False
+            else:
+                raise
+
+        return True
+
+    def open(self, path, mode='rb'):
+        return S3File(path, mode=mode)
+
+    def delete(self, path):
+        bucket, key, _ = url_to_bucket_key_name_tuple(S3FileSystem.normalize_s3_path(path))
+        get_s3().Object(bucket_name=bucket,
+                        key=key).delete()
+
+    def stat(self, path):
+        st = self.info(S3FileSystem.normalize_s3_path(path))
+
+        return FileStatus(path=path, length=st.get("ContentLength"), is_dir=False,
+                          blocksize=None, modification_time=st.get("LastModified"), access_time=None,
+                          permission=None, owner=None, group=None)
+
+    @staticmethod
+    def info(url):
+        bucket, key, _ = url_to_bucket_key_name_tuple(url)
+        return get_s3("client").head_object(Bucket=bucket,
+                                            Key=key)
+
+    @staticmethod
+    def normalize_s3_path(path):
+        return re.sub(r'^s3n://|s3a://', 's3://', path)
+
+
+class S3File(object):
+    MAX_CHUNK_SIZE = 4 * 1048576
+    MIN_CHUNK_SIZE = 2 * 65536
+
+    def __init__(self, path, mode="rb"):
+        self.path = path
+        bucket, key, name = url_to_bucket_key_name_tuple(S3FileSystem.normalize_s3_path(path))
+        self.curr_pos = 0
+        self.obj = (get_s3()
+                    .Object(bucket_name=bucket,
+                            key=key))
+        self.name = name
+        if mode.startswith("r"):
+            self.size = self.obj.content_length
+
+        self.isatty = False
+        self.closed = False
+
+        self.mode = mode
+
+        self.buffer_remote_reads = True
+
+        self.curr_buffer = None
+        self.curr_buffer_start = -1
+        self.curr_buffer_end = -1
+        self.buffer_reads = 0
+        self.buffer_hits = 0
+
+        self.chunk_size = self.MAX_CHUNK_SIZE
+
+    def close(self):
+        self.closed = True
+
+    def flush(self):
+        pass
+
+    def next(self):
+        return next(self.readline())
+
+    def read(self, n=0):
+        if self.curr_pos >= self.size:
+            return None
+        if self.buffer_remote_reads:
+            stream = self._read_from_buffer(n)
+        else:
+            if n <= 0:
+                n = self.size
+                stream = self.obj.get(Range='bytes={}-{}'.format(self.curr_pos,
+                                                                 self.size - self.curr_pos))['Body'].read()
+            else:
+                stream = self.obj.get(Range='bytes={}-{}'.format(self.curr_pos, self.curr_pos + n - 1))['Body'].read()
+
+        self.curr_pos = min(self.curr_pos + n, self.size)
+        return stream
+
+    def _read_from_buffer(self, n):
+        self.buffer_reads += 1
+        # if the buffer is none or if the entire read is not contained
+        # in our current buffer fill the buffer
+        if self.curr_buffer is None or not (self.curr_buffer_start <= self.curr_pos
+                                            and self.curr_pos + n < self.curr_buffer_end):
+            self.curr_buffer_start = self.curr_pos
+            self.curr_buffer_end = self.curr_pos + max(self.chunk_size, n)
+            byte_range = 'bytes={}-{}'.format(self.curr_buffer_start,
+                                              self.curr_buffer_end)
+            self.curr_buffer = io.BytesIO(self.obj.get(Range=byte_range)['Body'].read())
+        else:
+            self.buffer_hits += 1
+
+        # seek to the right position if we aren't at the start of the buffer
+        if self.curr_buffer_start != self.curr_pos:
+            self.curr_buffer.seek(self.curr_pos - self.curr_buffer_start)
+
+        return self.curr_buffer.read(n)
+
+    def readline(self, n=0):
+        if self.curr_buffer is None:
+            self.curr_buffer = io.BytesIO(self.obj.get()['Body'].read())
+        for line in self.curr_buffer:
+            yield line
+
+    def seek(self, offset, whence=0):
+        if whence == 0:
+            self.curr_pos = offset
+        elif whence == 1:
+            self.curr_pos += offset
+        elif whence == 2:
+            self.curr_pos = self.size + offset
+
+    def tell(self):
+        return self.curr_pos
+
+    def write(self, string):
+        resp = self.obj.put(Body=string)
+        if not resp.get("ResponseMetadata", dict()).get("HTTPStatusCode") == 200:
+            raise RuntimeError("Unable to write to {}".format(self.path))
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        self.close()
+
+    def __iter__(self):
+        return self
+
+    def __next__(self):
+        return self.next()
diff --git a/python/iceberg/core/hadoop/util.py b/python/iceberg/core/filesystem/util.py
similarity index 60%
rename from python/iceberg/core/hadoop/util.py
rename to python/iceberg/core/filesystem/util.py
index 8516e1e..d73dc11 100644
--- a/python/iceberg/core/hadoop/util.py
+++ b/python/iceberg/core/filesystem/util.py
@@ -15,16 +15,13 @@
 # specific language governing permissions and limitations
 # under the License.
 
-import time
 from urllib.parse import urlparse
 
-import boto3
-
-from .local_filesystem import LocalFileSystem
-from .s3_filesystem_wrapper import S3FileSystemWrapper
-
 
 def get_fs(path, conf, local_only=False):
+    from .local_filesystem import LocalFileSystem
+    from .s3_filesystem import S3FileSystem
+
     if local_only:
         return LocalFileSystem.get_instance()
     else:
@@ -32,22 +29,11 @@ def get_fs(path, conf, local_only=False):
 
         if parsed_path.scheme in ["", "file"]:
             return LocalFileSystem.get_instance()
-        elif parsed_path.scheme in ["s3", "s3n"]:
-            if conf.get("hive.aws_iam_role") is not None:
-                key, secret, token = get_sts_session_keys(conf.get("hive.aws_iam_role"))
-                return S3FileSystemWrapper(key=key, secret=secret, token=token)
-            return S3FileSystemWrapper()
+        elif parsed_path.scheme in ["s3", "s3n", "s3a"]:
+            fs = S3FileSystem.get_instance()
+            fs.set_conf(conf)
+            return fs
         elif parsed_path.scheme in ["hdfs"]:
             raise RuntimeError("Hadoop FS not implemented")
 
     raise RuntimeError("No filesystem found for this location: %s" % path)
-
-
-def get_sts_session_keys(role_arn):
-    client = boto3.client('sts')
-    response = client.assume_role(
-        RoleArn=role_arn,
-        RoleSessionName='iceberg_python_client_{}'.format(int(time.time())))
-    return (response["Credentials"]["AccessKeyId"],
-            response["Credentials"]["SecretAccessKey"],
-            response["Credentials"]["SessionToken"])
diff --git a/python/iceberg/core/hadoop/file_system.py b/python/iceberg/core/hadoop/file_system.py
deleted file mode 100644
index 2630227..0000000
--- a/python/iceberg/core/hadoop/file_system.py
+++ /dev/null
@@ -1,25 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-
-class FileSystem(object):
-
-    def open(self, path, mode='rb'):
-        raise NotImplementedError()
-
-    def stat(self, path):
-        raise NotImplementedError()
diff --git a/python/iceberg/core/hadoop/hadoop_output_file.py b/python/iceberg/core/hadoop/hadoop_output_file.py
deleted file mode 100644
index 4c0e243..0000000
--- a/python/iceberg/core/hadoop/hadoop_output_file.py
+++ /dev/null
@@ -1,49 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-from iceberg.api.io import OutputFile
-
-from .util import get_fs
-
-
-class HadoopOutputFile(OutputFile):
-
-    @staticmethod
-    def from_path(path, conf):
-        return HadoopOutputFile(path, conf)
-
-    def __init__(self, path, conf):
-        self.path = path
-        self.conf = conf
-
-    def create(self):
-        fs = get_fs(self.path, self.conf)
-        if fs.exists():
-            raise RuntimeError("File %s already exists" % self.path)
-
-        return fs.open(self.path)
-
-    def create_or_overwrite(self):
-        fs = get_fs(self.path, self.conf)
-
-        return fs.open(self.path, "wb")
-
-    def location(self):
-        return str(self.path)
-
-    def __str__(self):
-        return self.location()
diff --git a/python/iceberg/core/hadoop/hadoop_table_operations.py b/python/iceberg/core/hadoop/hadoop_table_operations.py
deleted file mode 100644
index e771084..0000000
--- a/python/iceberg/core/hadoop/hadoop_table_operations.py
+++ /dev/null
@@ -1,49 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import iceberg.core as core
-
-
-class HadoopTableOperations(core.TableOperations):
-
-    def __init__(self, location, conf):
-        self.conf = conf
-        self.location = location
-        self.should_refresh = True
-        self.verison = None
-        self.current_metadata = None
-
-    def current(self):
-        pass
-
-    def refresh(self):
-        raise RuntimeError("Not yet implemented")
-
-    def commit(self, base, metadata):
-        raise RuntimeError("Not yet implemented")
-
-    def new_input_file(self, path):
-        raise RuntimeError("Not yet implemented")
-
-    def new_metadata_file(self, filename):
-        raise RuntimeError("Not yet implemented")
-
-    def delete_file(self, path):
-        raise RuntimeError("Not yet implemented")
-
-    def new_snapshot_id(self):
-        raise RuntimeError("Not yet implemented")
diff --git a/python/iceberg/core/hadoop/s3_filesystem_wrapper.py b/python/iceberg/core/hadoop/s3_filesystem_wrapper.py
deleted file mode 100644
index f1fd713..0000000
--- a/python/iceberg/core/hadoop/s3_filesystem_wrapper.py
+++ /dev/null
@@ -1,54 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import s3fs
-
-from .file_status import FileStatus
-from .file_system import FileSystem
-
-
-class S3FileSystemWrapper(FileSystem):
-
-    def __init__(self, anon=False, key=None, secret=None, token=None):
-        self._fs = s3fs.S3FileSystem(anon=anon, key=key, secret=secret, token=token)
-
-    def open(self, path, mode='rb'):
-        return self._fs.open(S3FileSystemWrapper.fix_s3_path(path), mode=mode)
-
-    def stat(self, path):
-        is_dir = False
-        st = dict()
-        try:
-            st = self._fs.info(S3FileSystemWrapper.fix_s3_path(path), refresh=True)
-        except RuntimeError:
-            # must be a directory or subsequent du will fail
-            du = self._fs.du(S3FileSystemWrapper.fix_s3_path(path), refresh=True)
-            if len(du) > 0:
-                is_dir = True
-            length = 0
-            for key, val in du.items():
-                length += val
-
-        return FileStatus(path=path, length=st.get("Size", length), is_dir=is_dir,
-                          blocksize=None, modification_time=st.get("LastModified"), access_time=None,
-                          permission=None, owner=None, group=None)
-
-    @staticmethod
-    def fix_s3_path(path):
-        if path.startswith("s3n"):
-            path = "s3" + str(path[3:])
-        return path
diff --git a/python/iceberg/core/table_properties.py b/python/iceberg/core/table_properties.py
new file mode 100644
index 0000000..0511142
--- /dev/null
+++ b/python/iceberg/core/table_properties.py
@@ -0,0 +1,77 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+
+class TableProperties(object):
+    COMMIT_NUM_RETRIES = "commit.retry.num-retries"
+    COMMIT_NUM_RETRIES_DEFAULT = 4
+
+    COMMIT_MIN_RETRY_WAIT_MS = "commit.retry.min-wait-ms"
+    COMMIT_MIN_RETRY_WAIT_MS_DEFAULT = 100
+
+    COMMIT_MAX_RETRY_WAIT_MS = "commit.retry.max-wait-ms"
+    COMMIT_MAX_RETRY_WAIT_MS_DEFAULT = 60000
+
+    COMMIT_TOTAL_RETRY_TIME_MS = "commit.retry.total-timeout-ms"
+    COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT = 1800000
+
+    MANIFEST_TARGET_SIZE_BYTES = "commit.manifest.target-size-bytes"
+    MANIFEST_TARGET_SIZE_BYTES_DEFAULT = 8388608
+
+    MANIFEST_MIN_MERGE_COUNT = "commit.manifest.min-count-to-merge"
+    MANIFEST_MIN_MERGE_COUNT_DEFAULT = 100
+
+    DEFAULT_FILE_FORMAT = "write.format.default"
+    DEFAULT_FILE_FORMAT_DEFAULT = "parquet"
+
+    PARQUET_ROW_GROUP_SIZE_BYTES = "write.parquet.row-group-size-bytes"
+    PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT = "134217728"
+
+    PARQUET_PAGE_SIZE_BYTES = "write.parquet.page-size-bytes"
+    PARQUET_PAGE_SIZE_BYTES_DEFAULT = "1048576"
+
+    PARQUET_DICT_SIZE_BYTES = "write.parquet.dict-size-bytes"
+    PARQUET_DICT_SIZE_BYTES_DEFAULT = "2097152"
+
+    PARQUET_COMPRESSION = "write.parquet.compression-codec"
+    PARQUET_COMPRESSION_DEFAULT = "gzip"
+
+    AVRO_COMPRESSION = "write.avro.compression-codec"
+    AVRO_COMPRESSION_DEFAULT = "gzip"
+
+    SPLIT_SIZE = "read.split.target-size"
+    SPLIT_SIZE_DEFAULT = 134217728
+
+    SPLIT_LOOKBACK = "read.split.planning-lookback"
+    SPLIT_LOOKBACK_DEFAULT = 10
+
+    SPLIT_OPEN_FILE_COST = "read.split.open-file-cost"
+    SPLIT_OPEN_FILE_COST_DEFAULT = 4 * 1024 * 1024
+
+    OBJECT_STORE_ENABLED = "write.object-storage.enabled"
+    OBJECT_STORE_ENABLED_DEFAULT = False
+
+    OBJECT_STORE_PATH = "write.object-storage.path"
+
+    WRITE_NEW_DATA_LOCATION = "write.folder-storage.path"
+
+    WRITE_METADATA_LOCATION = "write.metadata.path"
+
+    MANIFEST_LISTS_ENABLED = "write.manifest-lists.enabled"
+
+    MANIFEST_LISTS_ENABLED_DEFAULT = True
diff --git a/python/iceberg/core/util/__init__.py b/python/iceberg/core/util/__init__.py
index 5df92bc..ba17648 100644
--- a/python/iceberg/core/util/__init__.py
+++ b/python/iceberg/core/util/__init__.py
@@ -15,6 +15,11 @@
 # specific language governing permissions and limitations
 # under the License.
 
-__all__ = ["AtomicInteger"]
+__all__ = ["AtomicInteger", "PackingIterator", "str_as_bool"]
 
 from .atomic_integer import AtomicInteger
+from .bin_packing import PackingIterator
+
+
+def str_as_bool(str_var):
+    return str_var is not None and str_var.lower() == "true"
diff --git a/python/iceberg/core/util/bin_packing.py b/python/iceberg/core/util/bin_packing.py
index 1ccca2e..2b7f0e1 100644
--- a/python/iceberg/core/util/bin_packing.py
+++ b/python/iceberg/core/util/bin_packing.py
@@ -15,6 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
+
 from copy import deepcopy
 
 
diff --git a/python/iceberg/spark/__init__.py b/python/iceberg/spark/__init__.py
deleted file mode 100644
index ede72c0..0000000
--- a/python/iceberg/spark/__init__.py
+++ /dev/null
@@ -1,19 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# flake8: noqa
-
-from .table_identifier import TableIdentifier
diff --git a/python/iceberg/spark/source/__init__.py b/python/iceberg/spark/source/__init__.py
deleted file mode 100644
index cc91ef4..0000000
--- a/python/iceberg/spark/source/__init__.py
+++ /dev/null
@@ -1,19 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# flake8: noqa
-
-from .spark_catalog import SparkCatalog
diff --git a/python/iceberg/spark/source/spark_catalog.py b/python/iceberg/spark/source/spark_catalog.py
deleted file mode 100644
index 858be62..0000000
--- a/python/iceberg/spark/source/spark_catalog.py
+++ /dev/null
@@ -1,53 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-class SparkCatalog(object):
-
-    def create(self, ident, schema, spec, properties):
-        pass
-
-    def load(self, ident):
-        pass
-
-    def drop(self, ident):
-        pass
-
-    def load_table(self, ident):
-        pass
-
-    def create_table(self, ident, table_type, partitions, properties):
-        pass
-
-    def apply_property_changes(self, table, changes):
-        pass
-
-    def apply_schema_changes(self, table, changes):
-        pass
-
-    def drop_table(self, ident):
-        pass
-
-    def initialize(self, name, options):
-        self.name = name
-        self.options = options
-
-    def convert(self, schema, partitioning):
-        pass
-
-    def __init__(self):
-        self.name = ""
-        self.options = None
diff --git a/python/iceberg/spark/table_identifier.py b/python/iceberg/spark/table_identifier.py
deleted file mode 100644
index 84d04f1..0000000
--- a/python/iceberg/spark/table_identifier.py
+++ /dev/null
@@ -1,21 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-class TableIdentifier(object):
-
-    def __init__(self):
-        pass
diff --git a/python/setup.py b/python/setup.py
index 5a57101..20f01f2 100755
--- a/python/setup.py
+++ b/python/setup.py
@@ -26,12 +26,13 @@ setup(
     keywords='iceberg',
     url='https://github.com/apache/incubator-iceberg/blob/master/README.md',
     python_requires='>=3.4',
-    install_requires=['boto3',
+    install_requires=['botocore',
+                      'boto3',
                       'fastavro',
                       'mmh3',
                       'python-dateutil',
                       'pytz',
-                      'retrying',
-                      's3fs'],
+                      'requests',
+                      'retrying'],
     setup_requires=['setupmeta']
 )
diff --git a/python/tests/core/test_table_metadata_parser.py b/python/tests/core/test_table_metadata_parser.py
index d9d3b28..9c8ad2c 100644
--- a/python/tests/core/test_table_metadata_parser.py
+++ b/python/tests/core/test_table_metadata_parser.py
@@ -17,18 +17,17 @@
 
 import binascii
 
-from iceberg.api import Files
 from iceberg.core import ConfigProperties, TableMetadataParser
-from iceberg.core.hadoop import HadoopInputFile
+from iceberg.core.filesystem import FileSystemInputFile, FileSystemOutputFile
 
 
 def test_compression_property(expected, prop):
     config = {ConfigProperties.COMPRESS_METADATA: prop}
-    output_file = Files.local_output(TableMetadataParser.get_file_extension(config))
+    output_file = FileSystemOutputFile.from_path(TableMetadataParser.get_file_extension(config), dict)
     TableMetadataParser.write(expected, output_file)
     assert prop == is_compressed(TableMetadataParser.get_file_extension(config))
     read = TableMetadataParser.read(None,
-                                    HadoopInputFile.from_location(TableMetadataParser.get_file_extension(config), None))
+                                    FileSystemInputFile.from_location(TableMetadataParser.get_file_extension(config), None))
     verify_metadata(read, expected)
 
 
diff --git a/python/tox.ini b/python/tox.ini
index 7a600ab..78ed949 100644
--- a/python/tox.ini
+++ b/python/tox.ini
@@ -64,7 +64,7 @@ commands =
 skips = B104
 
 [flake8]
-ignore = E501
+ignore = E501,W503
 exclude =
     *.egg-info,
     *.pyc,