You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by dw...@apache.org on 2019/08/09 19:18:48 UTC
[incubator-iceberg] branch master updated: Moving/Renameing hadoop
module to filesystem (#277)
This is an automated email from the ASF dual-hosted git repository.
dweeks pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new b5446e4 Moving/Renameing hadoop module to filesystem (#277)
b5446e4 is described below
commit b5446e4f1dc8ac598db9d1fcae58d8b7369ee071
Author: TGooch44 <te...@gmail.com>
AuthorDate: Fri Aug 9 12:18:43 2019 -0700
Moving/Renameing hadoop module to filesystem (#277)
---
python/iceberg/core/avro/avro_schema_util.py | 14 +-
.../core/base_metastore_table_operations.py | 43 +++-
python/iceberg/core/data_files.py | 8 +-
.../core/{hadoop => filesystem}/__init__.py | 10 +-
.../core/{hadoop => filesystem}/file_status.py | 0
.../file_system.py} | 69 +++++-
.../core/filesystem/filesystem_table_operations.py | 140 +++++++++++
.../iceberg/core/filesystem/filesystem_tables.py | 53 +++++
.../{hadoop => filesystem}/local_filesystem.py | 65 +++---
python/iceberg/core/filesystem/s3_filesystem.py | 257 +++++++++++++++++++++
python/iceberg/core/{hadoop => filesystem}/util.py | 28 +--
python/iceberg/core/hadoop/file_system.py | 25 --
python/iceberg/core/hadoop/hadoop_output_file.py | 49 ----
.../iceberg/core/hadoop/hadoop_table_operations.py | 49 ----
.../iceberg/core/hadoop/s3_filesystem_wrapper.py | 54 -----
python/iceberg/core/table_properties.py | 77 ++++++
python/iceberg/core/util/__init__.py | 7 +-
python/iceberg/core/util/bin_packing.py | 1 +
python/iceberg/spark/__init__.py | 19 --
python/iceberg/spark/source/__init__.py | 19 --
python/iceberg/spark/source/spark_catalog.py | 53 -----
python/iceberg/spark/table_identifier.py | 21 --
python/setup.py | 7 +-
python/tests/core/test_table_metadata_parser.py | 7 +-
python/tox.ini | 2 +-
25 files changed, 696 insertions(+), 381 deletions(-)
diff --git a/python/iceberg/core/avro/avro_schema_util.py b/python/iceberg/core/avro/avro_schema_util.py
index efe7299..5d47890 100644
--- a/python/iceberg/core/avro/avro_schema_util.py
+++ b/python/iceberg/core/avro/avro_schema_util.py
@@ -6,14 +6,14 @@
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
-# http://www.apache.org/licenses/LICENSE-2.0
+# http://www.apache.org/licenses/LICENSE-2.0
#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
class AvroSchemaUtil(object):
FIELD_ID_PROP = "field-id"
diff --git a/python/iceberg/core/base_metastore_table_operations.py b/python/iceberg/core/base_metastore_table_operations.py
index b536110..a50f342 100644
--- a/python/iceberg/core/base_metastore_table_operations.py
+++ b/python/iceberg/core/base_metastore_table_operations.py
@@ -18,13 +18,11 @@
import logging
import uuid
-from iceberg.core.hadoop import (get_fs,
- HadoopInputFile,
- HadoopOutputFile)
from retrying import retry
from .table_metadata_parser import TableMetadataParser
from .table_operations import TableOperations
+from .table_properties import TableProperties
_logger = logging.getLogger(__name__)
@@ -46,6 +44,7 @@ class BaseMetastoreTableOperations(TableOperations):
self.current_metadata = None
self.current_metadata_location = None
self.base_location = None
+ self.should_refresh = True
self.version = -1
def current(self):
@@ -59,13 +58,18 @@ class BaseMetastoreTableOperations(TableOperations):
return "{base_location}/{data}".format(base_location=self.base_location,
data=BaseMetastoreTableOperations.DATA_FOLDER_NAME)
+ def request_refresh(self):
+ self.should_refresh = True
+
def write_new_metadata(self, metadata, version):
+ from .filesystem import FileSystemOutputFile
+
if self.base_location is None:
self.base_location = metadata.location
new_filename = BaseMetastoreTableOperations.new_table_metadata_filename(self.base_location,
version)
- new_metadata_location = HadoopOutputFile.from_path(new_filename, self.conf)
+ new_metadata_location = FileSystemOutputFile.from_path(new_filename, self.conf)
TableMetadataParser.write(metadata, new_metadata_location)
return new_filename
@@ -73,24 +77,43 @@ class BaseMetastoreTableOperations(TableOperations):
def refresh_from_metadata_location(self, new_location, num_retries=20):
if not self.current_metadata_location == new_location:
_logger.info("Refreshing table metadata from new version: %s" % new_location)
+ self.retryable_refresh(new_location)
- self.retryable_refresh(new_location)
+ self.should_refresh = False
def new_input_file(self, path):
- return HadoopInputFile.from_location(path, self.conf)
+ from .filesystem import FileSystemInputFile
+
+ return FileSystemInputFile.from_location(path, self.conf)
def new_metadata_file(self, filename):
- return HadoopOutputFile.from_path(BaseMetastoreTableOperations.new_metadata_location(self.base_location,
- filename),
- self.conf)
+ from .filesystem import FileSystemOutputFile
+
+ return FileSystemOutputFile.from_path(BaseMetastoreTableOperations.new_metadata_location(self.base_location,
+ filename),
+ self.conf)
+
+ def metadata_file_location(self, file_name, metadata=None):
+ if metadata is None:
+ return self.metadata_file_location(file_name, metadata=self.current())
+
+ metadata_location = metadata.properties.get(TableProperties.WRITE_METADATA_LOCATION)
+
+ if metadata_location is not None:
+ return "{}/{}".format(metadata_location, file_name)
+ else:
+ return "{}/{}/{}".format(metadata.location, BaseMetastoreTableOperations.METADATA_FOLDER_NAME, file_name)
def delete_file(self, path):
+ from .filesystem import get_fs
get_fs(path, self.conf).delete(path, False)
@retry(wait_incrementing_start=100, wait_exponential_multiplier=4,
wait_exponential_max=5000, stop_max_delay=600000, stop_max_attempt_number=2)
def retryable_refresh(self, location):
- self.current_metadata = TableMetadataParser.read(self, HadoopInputFile.from_location(location, self.conf))
+ from .filesystem import FileSystemInputFile
+
+ self.current_metadata = TableMetadataParser.read(self, FileSystemInputFile.from_location(location, self.conf))
self.current_metadata_location = location
self.base_location = self.current_metadata.location
self.version = BaseMetastoreTableOperations.parse_version(location)
diff --git a/python/iceberg/core/data_files.py b/python/iceberg/core/data_files.py
index 34d4422..0a50e5b 100644
--- a/python/iceberg/core/data_files.py
+++ b/python/iceberg/core/data_files.py
@@ -20,8 +20,8 @@ from iceberg.api import (FileFormat,
Metrics)
from iceberg.api.types import Conversions
+from .filesystem import FileSystemInputFile
from .generic_data_file import GenericDataFile
-from .hadoop import HadoopInputFile
from .partition_data import PartitionData
@@ -78,7 +78,7 @@ class DataFiles(object):
@staticmethod
def from_input_file(input_file, row_count, partition_data=None, metrics=None):
- if isinstance(input_file, HadoopInputFile):
+ if isinstance(input_file, FileSystemInputFile):
return DataFiles.from_stat(input_file.get_stat(), row_count,
partition_data=partition_data, metrics=metrics)
@@ -124,6 +124,7 @@ class DataFileBuilder(object):
self.null_value_counts = None
self.lower_bounds = None
self.upper_bounds = None
+ return self
def copy(self, to_copy):
if self.is_partitioned:
@@ -148,7 +149,7 @@ class DataFileBuilder(object):
return self
def with_input_file(self, input_file):
- if isinstance(input_file, HadoopInputFile):
+ if isinstance(input_file, FileSystemInputFile):
self.with_status(input_file.get_stat())
self.file_path = self.location()
@@ -158,6 +159,7 @@ class DataFileBuilder(object):
def with_path(self, path):
self.file_path = path
+ return self
def with_format(self, fmt):
if isinstance(fmt, FileFormat):
diff --git a/python/iceberg/core/hadoop/__init__.py b/python/iceberg/core/filesystem/__init__.py
similarity index 63%
rename from python/iceberg/core/hadoop/__init__.py
rename to python/iceberg/core/filesystem/__init__.py
index 2ac455f..b836e2f 100644
--- a/python/iceberg/core/hadoop/__init__.py
+++ b/python/iceberg/core/filesystem/__init__.py
@@ -15,8 +15,12 @@
# specific language governing permissions and limitations
# under the License.
-__all__ = ["get_fs", "HadoopInputFile", "HadoopOutputFile"]
+__all__ = ["get_fs", "FileStatus", "FileSystem", "FileSystemInputFile", "FileSystemOutputFile",
+ "FilesystemTableOperations", "FilesystemTables", "S3File", "S3FileSystem"]
-from .hadoop_input_file import HadoopInputFile
-from .hadoop_output_file import HadoopOutputFile
+from .file_status import FileStatus
+from .file_system import FileSystem, FileSystemInputFile, FileSystemOutputFile
+from .filesystem_table_operations import FilesystemTableOperations
+from .filesystem_tables import FilesystemTables
+from .s3_filesystem import S3File, S3FileSystem
from .util import get_fs
diff --git a/python/iceberg/core/hadoop/file_status.py b/python/iceberg/core/filesystem/file_status.py
similarity index 100%
rename from python/iceberg/core/hadoop/file_status.py
rename to python/iceberg/core/filesystem/file_status.py
diff --git a/python/iceberg/core/hadoop/hadoop_input_file.py b/python/iceberg/core/filesystem/file_system.py
similarity index 50%
rename from python/iceberg/core/hadoop/hadoop_input_file.py
rename to python/iceberg/core/filesystem/file_system.py
index 66049dd..1ed2029 100644
--- a/python/iceberg/core/hadoop/hadoop_input_file.py
+++ b/python/iceberg/core/filesystem/file_system.py
@@ -17,12 +17,33 @@
import gzip
-from iceberg.api.io import InputFile
+from iceberg.api.io import InputFile, OutputFile
from .util import get_fs
-class HadoopInputFile(InputFile):
+class FileSystem(object):
+
+ def open(self, path, mode='rb'):
+ raise NotImplementedError()
+
+ def create(self, path, overwrite=False):
+ raise NotImplementedError()
+
+ def exists(self, path):
+ raise NotImplementedError()
+
+ def delete(self, path):
+ raise NotImplementedError()
+
+ def stat(self, path):
+ raise NotImplementedError()
+
+ def rename(self, src, dest):
+ raise NotImplementedError()
+
+
+class FileSystemInputFile(InputFile):
def __init__(self, fs, path, conf, length=None, stat=None):
self.fs = fs
@@ -34,7 +55,7 @@ class HadoopInputFile(InputFile):
@staticmethod
def from_location(location, conf):
fs = get_fs(location, conf)
- return HadoopInputFile(fs, location, conf)
+ return FileSystemInputFile(fs, location, conf)
def location(self):
return self.path
@@ -57,5 +78,43 @@ class HadoopInputFile(InputFile):
for line in fo:
yield line
- def new_fo(self):
- return self.fs.open(self.location())
+ def new_fo(self, mode="rb"):
+ return self.fs.open(self.location(), mode=mode)
+
+ def __repr__(self):
+ return "FileSystemInputFile({})".format(self.path)
+
+ def __str__(self):
+ return self.__repr__()
+
+
+class FileSystemOutputFile(OutputFile):
+
+ @staticmethod
+ def from_path(path, conf):
+ return FileSystemOutputFile(path, conf)
+
+ def __init__(self, path, conf):
+ self.path = path
+ self.conf = conf
+
+ def create(self, mode="w"):
+ fs = get_fs(self.path, self.conf)
+ if fs.exists(self.path):
+ raise RuntimeError("File %s already exists" % self.path)
+
+ return fs.open(self.path, mode=mode)
+
+ def create_or_overwrite(self):
+ fs = get_fs(self.path, self.conf)
+
+ return fs.open(self.path, "wb")
+
+ def location(self):
+ return str(self.path)
+
+ def __repr__(self):
+ return "FileSystemOutputFile({})".format(self.path)
+
+ def __str__(self):
+ return self.__repr__()
diff --git a/python/iceberg/core/filesystem/filesystem_table_operations.py b/python/iceberg/core/filesystem/filesystem_table_operations.py
new file mode 100644
index 0000000..1215654
--- /dev/null
+++ b/python/iceberg/core/filesystem/filesystem_table_operations.py
@@ -0,0 +1,140 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import logging
+from pathlib import Path
+import uuid
+
+from iceberg.exceptions import CommitFailedException, ValidationException
+
+from .file_system import FileSystemInputFile, FileSystemOutputFile
+from .util import get_fs
+from ..table_metadata_parser import TableMetadataParser
+from ..table_operations import TableOperations
+from ..table_properties import TableProperties
+
+_logger = logging.getLogger(__name__)
+
+
+class FilesystemTableOperations(TableOperations):
+
+ def __init__(self, location, conf):
+ self.conf = conf
+ self.location = Path(location)
+ self.should_refresh = True
+ self.version = None
+ self.current_metadata = None
+
+ def current(self):
+ if self.should_refresh:
+ return self.refresh()
+
+ return self.current_metadata
+
+ def refresh(self):
+ ver = self.version if self.version is not None else self.read_version_hint()
+ metadata_file = self.metadata_file(ver)
+ fs = get_fs(str(metadata_file), self.conf)
+
+ if ver is not None and not fs.exists(metadata_file):
+ if ver == 0:
+ return None
+ raise ValidationException("Metadata file is missing: %s" % metadata_file)
+
+ while fs.exists(str(self.metadata_file(ver + 1))):
+ ver += 1
+ metadata_file = self.metadata_file(ver)
+
+ self.version = ver
+ self.current_metadata = TableMetadataParser.read(self, FileSystemInputFile.from_location(str(metadata_file),
+ self.conf))
+ self.should_refresh = False
+ return self.current_metadata
+
+ def commit(self, base, metadata):
+ if base != self.current():
+ raise CommitFailedException("Cannot commit changes based on stale table metadata")
+
+ if not (base is None or base.location() == metadata.location()):
+ raise RuntimeError("Hadoop path-based tables cannot be relocated")
+ if TableProperties.WRITE_METADATA_LOCATION in metadata.properties:
+ raise RuntimeError("Hadoop path-based tables cannot be relocated")
+
+ temp_metadata_file = self.metadata_path("{}{}".format(uuid.uuid4(),
+ TableMetadataParser.get_file_extension(self.conf)))
+ TableMetadataParser.write(metadata, FileSystemOutputFile.from_path(str(temp_metadata_file), self.conf))
+
+ next_version = (self.version if self.version is not None else 0) + 1
+ final_metadata_file = self.metadata_file(next_version)
+ fs = get_fs(str(final_metadata_file), self.conf)
+
+ if fs.exists(final_metadata_file):
+ raise CommitFailedException("Version %s already exists: %s" % (next_version, final_metadata_file))
+
+ if not fs.rename(temp_metadata_file, final_metadata_file):
+ raise CommitFailedException("Failed to commit changes using rename: %s" % final_metadata_file)
+
+ self.write_version_hint(next_version)
+ self.should_refresh = True
+
+ def new_input_file(self, path):
+ return FileSystemInputFile.from_location(path, self.conf)
+
+ def new_output_file(self, path):
+ return FileSystemOutputFile.from_path(path, self.conf)
+
+ def new_metadata_file(self, filename):
+ raise RuntimeError("Not yet implemented")
+
+ def delete_file(self, path):
+ get_fs(path, self.conf).delete(path)
+
+ def new_snapshot_id(self):
+ raise RuntimeError("Not yet implemented")
+
+ def metadata_file_location(self, file):
+ return str(self.metadata_path(file))
+
+ def metadata_file(self, version):
+ return self.metadata_path("v{}{}".format(version, TableMetadataParser.get_file_extension(self.conf)))
+
+ def metadata_path(self, filename):
+ return self.location / Path("metadata") / Path(filename)
+
+ def version_hint_file(self):
+ return self.metadata_path("version-hint.text")
+
+ def read_version_hint(self):
+ version_hint_file = str(self.version_hint_file())
+ fs = get_fs(version_hint_file, self.conf)
+
+ if not fs.exists(version_hint_file):
+ return 0
+ else:
+ with fs.open(version_hint_file, "r") as fo:
+ return int(fo.readline().replace("\n", ""))
+
+ def write_version_hint(self, version):
+ version_hint_file = str(self.version_hint_file())
+ fs = get_fs(version_hint_file, self.conf)
+ try:
+
+ with fs.create(version_hint_file, True) as fo:
+ fo.write("{}".format(version))
+
+ except RuntimeError as e:
+ _logger.warning("Unable to update version hint: %s" % e)
diff --git a/python/iceberg/core/filesystem/filesystem_tables.py b/python/iceberg/core/filesystem/filesystem_tables.py
new file mode 100644
index 0000000..77bd751
--- /dev/null
+++ b/python/iceberg/core/filesystem/filesystem_tables.py
@@ -0,0 +1,53 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+from iceberg.api import Tables
+from iceberg.exceptions import NoSuchTableException
+
+from .filesystem_table_operations import FilesystemTableOperations
+from ..table_metadata import TableMetadata
+
+
+class FilesystemTables(Tables):
+
+ def __init__(self, conf=None):
+ self.conf = conf if conf is not None else dict()
+
+ def load(self, location):
+ from ..base_table import BaseTable
+ ops = self.new_table_ops(location)
+ if ops.current() is None:
+ raise NoSuchTableException("Table does not exist at location: %s" % location)
+
+ return BaseTable(ops, location)
+
+ def create(self, schema, table_identifier=None, spec=None, properties=None, location=None):
+ from ..base_table import BaseTable
+ spec, properties = super(FilesystemTables, self).default_args(spec, properties)
+ ops = self.new_table_ops(location)
+
+ metadata = TableMetadata.new_table_metadata(ops, schema, spec, location, properties)
+ ops.commit(None, metadata)
+
+ return BaseTable(ops, location)
+
+ def new_table_ops(self, location):
+ if location is None:
+ raise RuntimeError("location cannot be None")
+
+ return FilesystemTableOperations(location, self.conf)
diff --git a/python/iceberg/core/hadoop/local_filesystem.py b/python/iceberg/core/filesystem/local_filesystem.py
similarity index 63%
rename from python/iceberg/core/hadoop/local_filesystem.py
rename to python/iceberg/core/filesystem/local_filesystem.py
index aa55de9..826700a 100644
--- a/python/iceberg/core/hadoop/local_filesystem.py
+++ b/python/iceberg/core/filesystem/local_filesystem.py
@@ -15,8 +15,9 @@
# specific language governing permissions and limitations
# under the License.
+import errno
import os
-import stat
+from pathlib import Path
from .file_status import FileStatus
from .file_system import FileSystem
@@ -36,12 +37,23 @@ class LocalFileSystem(FileSystem):
LocalFileSystem.fs_inst = self
def open(self, path, mode='rb'):
+ open_path = Path(LocalFileSystem.fix_path(path))
- return open(LocalFileSystem.fix_path(path), mode=mode)
+ if "w" in mode and not open_path.parents[0].exists():
+ try:
+ open_path.parents[0].mkdir(parents=True)
+ except OSError as exc:
+ if exc.errno != errno.EEXIST:
+ raise
+
+ return open(open_path, mode=mode)
+
+ def delete(self, path):
+ raise NotImplementedError()
def stat(self, path):
st = os.stat(LocalFileSystem.fix_path(path))
- return FileStatus(path=path, length=st.st_size, is_dir=stat.S_ISDIR(st.st_mode),
+ return FileStatus(path=path, length=st.st_size, is_dir=os.stat.S_ISDIR(st.st_mode),
blocksize=st.st_blksize, modification_time=st.st_mtime, access_time=st.st_atime,
permission=st.st_mode, owner=st.st_uid, group=st.st_gid)
@@ -51,34 +63,19 @@ class LocalFileSystem(FileSystem):
path = str(path[7:])
return path
- # def ls(self, path):
- # return os.listdir(path)
- #
- # def delete(self, path, recursive=False):
- # return os.remove(path)
- #
-
- #
- # def rename(self, path, new_path):
- # return os.rename(path, new_path)
- #
- # def mkdir(self, path, create_parents=True):
- # if create_parents:
- # return os.makedirs(path)
- #
- # return os.mkdir(path)
- #
- # def exists(self, path):
- # return os.path.isfile(path)
- #
- # def isdir(self, path):
- # return os.path.isdir(path)
- #
- # def isfile(self, path):
- # return os.path.isfile(path)
- #
- # def _isfilestore(self):
- # return True
- #
- # def open(self, path, mode='rb'):
- # return open(path, mode=mode)
+ def create(self, path, overwrite=False):
+ if os.path.exists(path) and not overwrite:
+ raise RuntimeError("Path %s already exists" % path)
+
+ return open(path, "w")
+
+ def rename(self, src, dest):
+ try:
+ os.rename(src, dest)
+ except OSError:
+ return False
+
+ return True
+
+ def exists(self, path):
+ return os.path.exists(path)
diff --git a/python/iceberg/core/filesystem/s3_filesystem.py b/python/iceberg/core/filesystem/s3_filesystem.py
new file mode 100644
index 0000000..9423ee0
--- /dev/null
+++ b/python/iceberg/core/filesystem/s3_filesystem.py
@@ -0,0 +1,257 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import io
+import logging
+import re
+import time
+from urllib.parse import urlparse
+
+import boto3
+from botocore.credentials import RefreshableCredentials
+from botocore.exceptions import ClientError
+from botocore.session import get_session
+from retrying import retry
+
+from .file_status import FileStatus
+from .file_system import FileSystem
+
+_logger = logging.getLogger(__name__)
+
+
+S3_CLIENT = dict()
+BOTO_STS_CLIENT = boto3.client('sts')
+CONF = None
+ROLE_ARN = "default"
+AUTOREFRESH_SESSION = None
+
+
+
+@retry(wait_incrementing_start=100, wait_exponential_multiplier=4,
+ wait_exponential_max=5000, stop_max_delay=600000, stop_max_attempt_number=7)
+def get_s3(obj="resource"):
+ global AUTOREFRESH_SESSION
+ global ROLE_ARN
+ if ROLE_ARN not in S3_CLIENT:
+ S3_CLIENT[ROLE_ARN] = dict()
+
+ if ROLE_ARN == "default":
+ if AUTOREFRESH_SESSION is None:
+ AUTOREFRESH_SESSION = boto3.Session()
+ S3_CLIENT["default"]["resource"] = AUTOREFRESH_SESSION.resource('s3')
+ S3_CLIENT["default"]["client"] = AUTOREFRESH_SESSION.client('s3')
+ else:
+ if AUTOREFRESH_SESSION is None:
+ sess = get_session()
+ sess._credentials = RefreshableCredentials.create_from_metadata(metadata=refresh_sts_session_keys(),
+ refresh_using=refresh_sts_session_keys,
+ method="sts-assume-role")
+ AUTOREFRESH_SESSION = boto3.Session(botocore_session=sess)
+
+ S3_CLIENT[ROLE_ARN]["resource"] = AUTOREFRESH_SESSION.resource("s3")
+ S3_CLIENT[ROLE_ARN]["client"] = AUTOREFRESH_SESSION.client("s3")
+
+ return S3_CLIENT.get(ROLE_ARN, dict()).get(obj)
+
+
+def refresh_sts_session_keys():
+ params = {"RoleArn": ROLE_ARN,
+ "RoleSessionName": "iceberg_python_client_{}".format(int(time.time() * 1000.00))}
+
+ sts_creds = BOTO_STS_CLIENT.assume_role(**params).get("Credentials")
+ credentials = {"access_key": sts_creds.get("AccessKeyId"),
+ "secret_key": sts_creds.get("SecretAccessKey"),
+ "token": sts_creds.get("SessionToken"),
+ "expiry_time": sts_creds.get("Expiration").isoformat()}
+ return credentials
+
+
+def url_to_bucket_key_name_tuple(url):
+ parsed_url = urlparse(url)
+ return parsed_url.netloc, parsed_url.path[1:], parsed_url.path.split("/")[-1]
+
+
+class S3FileSystem(FileSystem):
+ fs_inst = None
+
+ @staticmethod
+ def get_instance():
+ if S3FileSystem.fs_inst is None:
+ S3FileSystem()
+ return S3FileSystem.fs_inst
+
+ def __init__(self):
+ if S3FileSystem.fs_inst is None:
+ S3FileSystem.fs_inst = self
+
+ def set_conf(self, conf):
+ global CONF
+
+ CONF = conf
+ self.set_role(CONF.get("hive.aws_iam_role", "default"))
+
+ def set_role(self, role):
+ global ROLE_ARN
+
+ if role is not None:
+ ROLE_ARN = role
+
+ def exists(self, path):
+ try:
+ self.info(path)
+ except ClientError as ce:
+ if ce.response['Error']['Code'] == "404":
+ return False
+ else:
+ raise
+
+ return True
+
+ def open(self, path, mode='rb'):
+ return S3File(path, mode=mode)
+
+ def delete(self, path):
+ bucket, key, _ = url_to_bucket_key_name_tuple(S3FileSystem.normalize_s3_path(path))
+ get_s3().Object(bucket_name=bucket,
+ key=key).delete()
+
+ def stat(self, path):
+ st = self.info(S3FileSystem.normalize_s3_path(path))
+
+ return FileStatus(path=path, length=st.get("ContentLength"), is_dir=False,
+ blocksize=None, modification_time=st.get("LastModified"), access_time=None,
+ permission=None, owner=None, group=None)
+
+ @staticmethod
+ def info(url):
+ bucket, key, _ = url_to_bucket_key_name_tuple(url)
+ return get_s3("client").head_object(Bucket=bucket,
+ Key=key)
+
+ @staticmethod
+ def normalize_s3_path(path):
+ return re.sub(r'^s3n://|s3a://', 's3://', path)
+
+
+class S3File(object):
+ MAX_CHUNK_SIZE = 4 * 1048576
+ MIN_CHUNK_SIZE = 2 * 65536
+
+ def __init__(self, path, mode="rb"):
+ self.path = path
+ bucket, key, name = url_to_bucket_key_name_tuple(S3FileSystem.normalize_s3_path(path))
+ self.curr_pos = 0
+ self.obj = (get_s3()
+ .Object(bucket_name=bucket,
+ key=key))
+ self.name = name
+ if mode.startswith("r"):
+ self.size = self.obj.content_length
+
+ self.isatty = False
+ self.closed = False
+
+ self.mode = mode
+
+ self.buffer_remote_reads = True
+
+ self.curr_buffer = None
+ self.curr_buffer_start = -1
+ self.curr_buffer_end = -1
+ self.buffer_reads = 0
+ self.buffer_hits = 0
+
+ self.chunk_size = self.MAX_CHUNK_SIZE
+
+ def close(self):
+ self.closed = True
+
+ def flush(self):
+ pass
+
+ def next(self):
+ return next(self.readline())
+
+ def read(self, n=0):
+ if self.curr_pos >= self.size:
+ return None
+ if self.buffer_remote_reads:
+ stream = self._read_from_buffer(n)
+ else:
+ if n <= 0:
+ n = self.size
+ stream = self.obj.get(Range='bytes={}-{}'.format(self.curr_pos,
+ self.size - self.curr_pos))['Body'].read()
+ else:
+ stream = self.obj.get(Range='bytes={}-{}'.format(self.curr_pos, self.curr_pos + n - 1))['Body'].read()
+
+ self.curr_pos = min(self.curr_pos + n, self.size)
+ return stream
+
+ def _read_from_buffer(self, n):
+ self.buffer_reads += 1
+ # if the buffer is none or if the entire read is not contained
+ # in our current buffer fill the buffer
+ if self.curr_buffer is None or not (self.curr_buffer_start <= self.curr_pos
+ and self.curr_pos + n < self.curr_buffer_end):
+ self.curr_buffer_start = self.curr_pos
+ self.curr_buffer_end = self.curr_pos + max(self.chunk_size, n)
+ byte_range = 'bytes={}-{}'.format(self.curr_buffer_start,
+ self.curr_buffer_end)
+ self.curr_buffer = io.BytesIO(self.obj.get(Range=byte_range)['Body'].read())
+ else:
+ self.buffer_hits += 1
+
+ # seek to the right position if we aren't at the start of the buffer
+ if self.curr_buffer_start != self.curr_pos:
+ self.curr_buffer.seek(self.curr_pos - self.curr_buffer_start)
+
+ return self.curr_buffer.read(n)
+
+ def readline(self, n=0):
+ if self.curr_buffer is None:
+ self.curr_buffer = io.BytesIO(self.obj.get()['Body'].read())
+ for line in self.curr_buffer:
+ yield line
+
+ def seek(self, offset, whence=0):
+ if whence == 0:
+ self.curr_pos = offset
+ elif whence == 1:
+ self.curr_pos += offset
+ elif whence == 2:
+ self.curr_pos = self.size + offset
+
+ def tell(self):
+ return self.curr_pos
+
+ def write(self, string):
+ resp = self.obj.put(Body=string)
+ if not resp.get("ResponseMetadata", dict()).get("HTTPStatusCode") == 200:
+ raise RuntimeError("Unable to write to {}".format(self.path))
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.close()
+
+ def __iter__(self):
+ return self
+
+ def __next__(self):
+ return self.next()
diff --git a/python/iceberg/core/hadoop/util.py b/python/iceberg/core/filesystem/util.py
similarity index 60%
rename from python/iceberg/core/hadoop/util.py
rename to python/iceberg/core/filesystem/util.py
index 8516e1e..d73dc11 100644
--- a/python/iceberg/core/hadoop/util.py
+++ b/python/iceberg/core/filesystem/util.py
@@ -15,16 +15,13 @@
# specific language governing permissions and limitations
# under the License.
-import time
from urllib.parse import urlparse
-import boto3
-
-from .local_filesystem import LocalFileSystem
-from .s3_filesystem_wrapper import S3FileSystemWrapper
-
def get_fs(path, conf, local_only=False):
+ from .local_filesystem import LocalFileSystem
+ from .s3_filesystem import S3FileSystem
+
if local_only:
return LocalFileSystem.get_instance()
else:
@@ -32,22 +29,11 @@ def get_fs(path, conf, local_only=False):
if parsed_path.scheme in ["", "file"]:
return LocalFileSystem.get_instance()
- elif parsed_path.scheme in ["s3", "s3n"]:
- if conf.get("hive.aws_iam_role") is not None:
- key, secret, token = get_sts_session_keys(conf.get("hive.aws_iam_role"))
- return S3FileSystemWrapper(key=key, secret=secret, token=token)
- return S3FileSystemWrapper()
+ elif parsed_path.scheme in ["s3", "s3n", "s3a"]:
+ fs = S3FileSystem.get_instance()
+ fs.set_conf(conf)
+ return fs
elif parsed_path.scheme in ["hdfs"]:
raise RuntimeError("Hadoop FS not implemented")
raise RuntimeError("No filesystem found for this location: %s" % path)
-
-
-def get_sts_session_keys(role_arn):
- client = boto3.client('sts')
- response = client.assume_role(
- RoleArn=role_arn,
- RoleSessionName='iceberg_python_client_{}'.format(int(time.time())))
- return (response["Credentials"]["AccessKeyId"],
- response["Credentials"]["SecretAccessKey"],
- response["Credentials"]["SessionToken"])
diff --git a/python/iceberg/core/hadoop/file_system.py b/python/iceberg/core/hadoop/file_system.py
deleted file mode 100644
index 2630227..0000000
--- a/python/iceberg/core/hadoop/file_system.py
+++ /dev/null
@@ -1,25 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-
-class FileSystem(object):
-
- def open(self, path, mode='rb'):
- raise NotImplementedError()
-
- def stat(self, path):
- raise NotImplementedError()
diff --git a/python/iceberg/core/hadoop/hadoop_output_file.py b/python/iceberg/core/hadoop/hadoop_output_file.py
deleted file mode 100644
index 4c0e243..0000000
--- a/python/iceberg/core/hadoop/hadoop_output_file.py
+++ /dev/null
@@ -1,49 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-from iceberg.api.io import OutputFile
-
-from .util import get_fs
-
-
-class HadoopOutputFile(OutputFile):
-
- @staticmethod
- def from_path(path, conf):
- return HadoopOutputFile(path, conf)
-
- def __init__(self, path, conf):
- self.path = path
- self.conf = conf
-
- def create(self):
- fs = get_fs(self.path, self.conf)
- if fs.exists():
- raise RuntimeError("File %s already exists" % self.path)
-
- return fs.open(self.path)
-
- def create_or_overwrite(self):
- fs = get_fs(self.path, self.conf)
-
- return fs.open(self.path, "wb")
-
- def location(self):
- return str(self.path)
-
- def __str__(self):
- return self.location()
diff --git a/python/iceberg/core/hadoop/hadoop_table_operations.py b/python/iceberg/core/hadoop/hadoop_table_operations.py
deleted file mode 100644
index e771084..0000000
--- a/python/iceberg/core/hadoop/hadoop_table_operations.py
+++ /dev/null
@@ -1,49 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import iceberg.core as core
-
-
-class HadoopTableOperations(core.TableOperations):
-
- def __init__(self, location, conf):
- self.conf = conf
- self.location = location
- self.should_refresh = True
- self.verison = None
- self.current_metadata = None
-
- def current(self):
- pass
-
- def refresh(self):
- raise RuntimeError("Not yet implemented")
-
- def commit(self, base, metadata):
- raise RuntimeError("Not yet implemented")
-
- def new_input_file(self, path):
- raise RuntimeError("Not yet implemented")
-
- def new_metadata_file(self, filename):
- raise RuntimeError("Not yet implemented")
-
- def delete_file(self, path):
- raise RuntimeError("Not yet implemented")
-
- def new_snapshot_id(self):
- raise RuntimeError("Not yet implemented")
diff --git a/python/iceberg/core/hadoop/s3_filesystem_wrapper.py b/python/iceberg/core/hadoop/s3_filesystem_wrapper.py
deleted file mode 100644
index f1fd713..0000000
--- a/python/iceberg/core/hadoop/s3_filesystem_wrapper.py
+++ /dev/null
@@ -1,54 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import s3fs
-
-from .file_status import FileStatus
-from .file_system import FileSystem
-
-
-class S3FileSystemWrapper(FileSystem):
-
- def __init__(self, anon=False, key=None, secret=None, token=None):
- self._fs = s3fs.S3FileSystem(anon=anon, key=key, secret=secret, token=token)
-
- def open(self, path, mode='rb'):
- return self._fs.open(S3FileSystemWrapper.fix_s3_path(path), mode=mode)
-
- def stat(self, path):
- is_dir = False
- st = dict()
- try:
- st = self._fs.info(S3FileSystemWrapper.fix_s3_path(path), refresh=True)
- except RuntimeError:
- # must be a directory or subsequent du will fail
- du = self._fs.du(S3FileSystemWrapper.fix_s3_path(path), refresh=True)
- if len(du) > 0:
- is_dir = True
- length = 0
- for key, val in du.items():
- length += val
-
- return FileStatus(path=path, length=st.get("Size", length), is_dir=is_dir,
- blocksize=None, modification_time=st.get("LastModified"), access_time=None,
- permission=None, owner=None, group=None)
-
- @staticmethod
- def fix_s3_path(path):
- if path.startswith("s3n"):
- path = "s3" + str(path[3:])
- return path
diff --git a/python/iceberg/core/table_properties.py b/python/iceberg/core/table_properties.py
new file mode 100644
index 0000000..0511142
--- /dev/null
+++ b/python/iceberg/core/table_properties.py
@@ -0,0 +1,77 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+
+class TableProperties(object):
+ COMMIT_NUM_RETRIES = "commit.retry.num-retries"
+ COMMIT_NUM_RETRIES_DEFAULT = 4
+
+ COMMIT_MIN_RETRY_WAIT_MS = "commit.retry.min-wait-ms"
+ COMMIT_MIN_RETRY_WAIT_MS_DEFAULT = 100
+
+ COMMIT_MAX_RETRY_WAIT_MS = "commit.retry.max-wait-ms"
+ COMMIT_MAX_RETRY_WAIT_MS_DEFAULT = 60000
+
+ COMMIT_TOTAL_RETRY_TIME_MS = "commit.retry.total-timeout-ms"
+ COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT = 1800000
+
+ MANIFEST_TARGET_SIZE_BYTES = "commit.manifest.target-size-bytes"
+ MANIFEST_TARGET_SIZE_BYTES_DEFAULT = 8388608
+
+ MANIFEST_MIN_MERGE_COUNT = "commit.manifest.min-count-to-merge"
+ MANIFEST_MIN_MERGE_COUNT_DEFAULT = 100
+
+ DEFAULT_FILE_FORMAT = "write.format.default"
+ DEFAULT_FILE_FORMAT_DEFAULT = "parquet"
+
+ PARQUET_ROW_GROUP_SIZE_BYTES = "write.parquet.row-group-size-bytes"
+ PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT = "134217728"
+
+ PARQUET_PAGE_SIZE_BYTES = "write.parquet.page-size-bytes"
+ PARQUET_PAGE_SIZE_BYTES_DEFAULT = "1048576"
+
+ PARQUET_DICT_SIZE_BYTES = "write.parquet.dict-size-bytes"
+ PARQUET_DICT_SIZE_BYTES_DEFAULT = "2097152"
+
+ PARQUET_COMPRESSION = "write.parquet.compression-codec"
+ PARQUET_COMPRESSION_DEFAULT = "gzip"
+
+ AVRO_COMPRESSION = "write.avro.compression-codec"
+ AVRO_COMPRESSION_DEFAULT = "gzip"
+
+ SPLIT_SIZE = "read.split.target-size"
+ SPLIT_SIZE_DEFAULT = 134217728
+
+ SPLIT_LOOKBACK = "read.split.planning-lookback"
+ SPLIT_LOOKBACK_DEFAULT = 10
+
+ SPLIT_OPEN_FILE_COST = "read.split.open-file-cost"
+ SPLIT_OPEN_FILE_COST_DEFAULT = 4 * 1024 * 1024
+
+ OBJECT_STORE_ENABLED = "write.object-storage.enabled"
+ OBJECT_STORE_ENABLED_DEFAULT = False
+
+ OBJECT_STORE_PATH = "write.object-storage.path"
+
+ WRITE_NEW_DATA_LOCATION = "write.folder-storage.path"
+
+ WRITE_METADATA_LOCATION = "write.metadata.path"
+
+ MANIFEST_LISTS_ENABLED = "write.manifest-lists.enabled"
+
+ MANIFEST_LISTS_ENABLED_DEFAULT = True
diff --git a/python/iceberg/core/util/__init__.py b/python/iceberg/core/util/__init__.py
index 5df92bc..ba17648 100644
--- a/python/iceberg/core/util/__init__.py
+++ b/python/iceberg/core/util/__init__.py
@@ -15,6 +15,11 @@
# specific language governing permissions and limitations
# under the License.
-__all__ = ["AtomicInteger"]
+__all__ = ["AtomicInteger", "PackingIterator", "str_as_bool"]
from .atomic_integer import AtomicInteger
+from .bin_packing import PackingIterator
+
+
+def str_as_bool(str_var):
+ return str_var is not None and str_var.lower() == "true"
diff --git a/python/iceberg/core/util/bin_packing.py b/python/iceberg/core/util/bin_packing.py
index 1ccca2e..2b7f0e1 100644
--- a/python/iceberg/core/util/bin_packing.py
+++ b/python/iceberg/core/util/bin_packing.py
@@ -15,6 +15,7 @@
# specific language governing permissions and limitations
# under the License.
+
from copy import deepcopy
diff --git a/python/iceberg/spark/__init__.py b/python/iceberg/spark/__init__.py
deleted file mode 100644
index ede72c0..0000000
--- a/python/iceberg/spark/__init__.py
+++ /dev/null
@@ -1,19 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# flake8: noqa
-
-from .table_identifier import TableIdentifier
diff --git a/python/iceberg/spark/source/__init__.py b/python/iceberg/spark/source/__init__.py
deleted file mode 100644
index cc91ef4..0000000
--- a/python/iceberg/spark/source/__init__.py
+++ /dev/null
@@ -1,19 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# flake8: noqa
-
-from .spark_catalog import SparkCatalog
diff --git a/python/iceberg/spark/source/spark_catalog.py b/python/iceberg/spark/source/spark_catalog.py
deleted file mode 100644
index 858be62..0000000
--- a/python/iceberg/spark/source/spark_catalog.py
+++ /dev/null
@@ -1,53 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-class SparkCatalog(object):
-
- def create(self, ident, schema, spec, properties):
- pass
-
- def load(self, ident):
- pass
-
- def drop(self, ident):
- pass
-
- def load_table(self, ident):
- pass
-
- def create_table(self, ident, table_type, partitions, properties):
- pass
-
- def apply_property_changes(self, table, changes):
- pass
-
- def apply_schema_changes(self, table, changes):
- pass
-
- def drop_table(self, ident):
- pass
-
- def initialize(self, name, options):
- self.name = name
- self.options = options
-
- def convert(self, schema, partitioning):
- pass
-
- def __init__(self):
- self.name = ""
- self.options = None
diff --git a/python/iceberg/spark/table_identifier.py b/python/iceberg/spark/table_identifier.py
deleted file mode 100644
index 84d04f1..0000000
--- a/python/iceberg/spark/table_identifier.py
+++ /dev/null
@@ -1,21 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-class TableIdentifier(object):
-
- def __init__(self):
- pass
diff --git a/python/setup.py b/python/setup.py
index 5a57101..20f01f2 100755
--- a/python/setup.py
+++ b/python/setup.py
@@ -26,12 +26,13 @@ setup(
keywords='iceberg',
url='https://github.com/apache/incubator-iceberg/blob/master/README.md',
python_requires='>=3.4',
- install_requires=['boto3',
+ install_requires=['botocore',
+ 'boto3',
'fastavro',
'mmh3',
'python-dateutil',
'pytz',
- 'retrying',
- 's3fs'],
+ 'requests',
+ 'retrying'],
setup_requires=['setupmeta']
)
diff --git a/python/tests/core/test_table_metadata_parser.py b/python/tests/core/test_table_metadata_parser.py
index d9d3b28..9c8ad2c 100644
--- a/python/tests/core/test_table_metadata_parser.py
+++ b/python/tests/core/test_table_metadata_parser.py
@@ -17,18 +17,17 @@
import binascii
-from iceberg.api import Files
from iceberg.core import ConfigProperties, TableMetadataParser
-from iceberg.core.hadoop import HadoopInputFile
+from iceberg.core.filesystem import FileSystemInputFile, FileSystemOutputFile
def test_compression_property(expected, prop):
config = {ConfigProperties.COMPRESS_METADATA: prop}
- output_file = Files.local_output(TableMetadataParser.get_file_extension(config))
+ output_file = FileSystemOutputFile.from_path(TableMetadataParser.get_file_extension(config), dict)
TableMetadataParser.write(expected, output_file)
assert prop == is_compressed(TableMetadataParser.get_file_extension(config))
read = TableMetadataParser.read(None,
- HadoopInputFile.from_location(TableMetadataParser.get_file_extension(config), None))
+ FileSystemInputFile.from_location(TableMetadataParser.get_file_extension(config), None))
verify_metadata(read, expected)
diff --git a/python/tox.ini b/python/tox.ini
index 7a600ab..78ed949 100644
--- a/python/tox.ini
+++ b/python/tox.ini
@@ -64,7 +64,7 @@ commands =
skips = B104
[flake8]
-ignore = E501
+ignore = E501,W503
exclude =
*.egg-info,
*.pyc,