You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2020/04/09 03:20:54 UTC

[incubator-iceberg] branch master updated: Add Java code examples and update site docs (#678)

This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 634e149  Add Java code examples and update site docs (#678)
634e149 is described below

commit 634e14972f25666b22af9cdab1718eef2831e9e4
Author: cmathiesen <t-...@hotels.com>
AuthorDate: Thu Apr 9 04:20:44 2020 +0100

    Add Java code examples and update site docs (#678)
---
 site/docs/api-quickstart.md                        |   2 +-
 site/docs/evolution.md                             |   5 +
 site/docs/getting-started.md                       |  63 ++++++-
 site/docs/img/partition-spec-evolution.png         | Bin 0 -> 224020 bytes
 .../{api-quickstart.md => java-api-quickstart.md}  |  91 +++++-----
 site/mkdocs.yml                                    |  11 +-
 .../apache/iceberg/examples/ConcurrencyTest.java   | 127 ++++++++++++++
 .../java/org/apache/iceberg/examples/README.md     | 194 +++++++++++++++++++++
 .../iceberg/examples/ReadAndWriteTablesTest.java   | 164 +++++++++++++++++
 .../iceberg/examples/SchemaEvolutionTest.java      | 173 ++++++++++++++++++
 .../org/apache/iceberg/examples/SimpleRecord.java  |  80 +++++++++
 .../examples/SnapshotFunctionalityTest.java        | 153 ++++++++++++++++
 spark/src/test/resources/data/books.json           |   6 +
 spark/src/test/resources/data/new-books.json       |   4 +
 14 files changed, 1018 insertions(+), 55 deletions(-)

diff --git a/site/docs/api-quickstart.md b/site/docs/api-quickstart.md
index 4e369d3..00f7f35 100644
--- a/site/docs/api-quickstart.md
+++ b/site/docs/api-quickstart.md
@@ -15,7 +15,7 @@
  - limitations under the License.
  -->
 
-# API Quickstart
+# Spark API Quickstart
 
 ## Create a table
 
diff --git a/site/docs/evolution.md b/site/docs/evolution.md
index 37066ef..343b77d 100644
--- a/site/docs/evolution.md
+++ b/site/docs/evolution.md
@@ -54,6 +54,11 @@ Iceberg uses unique IDs to track each column in a table. When you add a column,
 
 Iceberg table partitioning can be updated in an existing table because queries do not reference partition values directly.
 
+When you evolve a partition spec, the old data written with an earlier spec remains unchanged. New data is written using the new spec in a new layout. Metadata for each of the partition versions is kept separately. Because of this, when you start writing queries, you get split planning. This is where each partition layout plans files separately using the filter it derives for that specific partition layout. Here's a visual representation of a contrived example: 
+
+![Partition evolution diagram](img/partition-spec-evolution.png)
+*The data for 2008 is partitioned by month. Starting from 2009 the table is updated so that the data is instead partitioned by day. Both partitioning layouts are able to coexist in the same table.*
+
 Iceberg uses [hidden partitioning](../partitioning), so you don't *need* to write queries for a specific partition layout to be fast. Instead, you can write queries that select the data you need, and Iceberg automatically prunes out files that don't contain matching data.
 
 Partition evolution is a metadata operation and does not eagerly rewrite files.
diff --git a/site/docs/getting-started.md b/site/docs/getting-started.md
index 12db7ba..4fe55ee 100644
--- a/site/docs/getting-started.md
+++ b/site/docs/getting-started.md
@@ -15,4 +15,65 @@
  - limitations under the License.
  -->
 
-## Getting Started
+# Getting Started
+
+## Including Iceberg 
+
+### Downloads
+
+The latest version of Iceberg is [0.7.0-incubating](https://github.com/apache/incubator-iceberg/releases/tag/apache-iceberg-0.7.0-incubating).
+
+* [0.7.0-incubating source tar.gz](https://www.apache.org/dyn/closer.cgi/incubator/iceberg/apache-iceberg-0.7.0-incubating/apache-iceberg-0.7.0-incubating.tar.gz) -- [signature](https://dist.apache.org/repos/dist/release/incubator/iceberg/apache-iceberg-0.7.0-incubating/apache-iceberg-0.7.0-incubating.tar.gz.asc) -- [sha512](https://dist.apache.org/repos/dist/release/incubator/iceberg/apache-iceberg-0.7.0-incubating/apache-iceberg-0.7.0-incubating.tar.gz.sha512)
+* [0.7.0-incubating Spark 2.4 runtime Jar](https://search.maven.org/remotecontent?filepath=org/apache/iceberg/iceberg-spark-runtime/0.7.0-incubating/iceberg-spark-runtime-0.7.0-incubating.jar)
+
+One way to use Iceberg in Spark 2.4 is to download the runtime Jar and add it to the jars folder of your Spark install.
+
+Spark 2.4 is limited to reading and writing existing Iceberg tables. Use the [Iceberg API](../api) to create Iceberg tables.
+
+The recommended way is to include Iceberg's latest release using the `--packages` option:
+```sh
+spark-shell --packages org.apache.iceberg:iceberg-spark-runtime:0.7.0-incubating
+```
+
+You can also build Iceberg locally, and add the jar to Spark's classpath. This can be helpful to test unreleased features or while developing something new:
+
+```sh
+./gradlew assemble
+
+spark-shell --jars spark-runtime/build/libs/iceberg-spark-runtime-93990904.jar
+```
+
+Where you have to replace `93990904` with the git hash that you're using.
+
+### Gradle
+To add a dependency on Iceberg in Gradle, add the following to `build.gradle`:
+```
+dependencies {
+  compile 'org.apache.iceberg:iceberg-core:0.7.0-incubating'
+}
+```
+
+### Maven 
+If you'd like to try out Iceberg in a Maven project using the Spark Iceberg API, you can add the `iceberg-spark-runtime` dependency to your `pom.xml` file:
+```xml
+   <dependency>
+     <groupId>org.apache.iceberg</groupId>
+     <artifactId>iceberg-spark-runtime</artifactId>
+     <version>${iceberg.version}</version>
+   </dependency>
+```
+
+You'll also need `spark-sql` to read tables:
+```xml
+  <dependency> 
+    <groupId>org.apache.spark</groupId>
+    <artifactId>spark-sql_2.11</artifactId>
+    <version>2.4.4</version>
+  </dependency>
+```
+
+### Using the API 
+For examples on how to use the Iceberg API see:
+
+- [Spark](api-quickstart.md)
+- [Java](java-api-quickstart.md)
diff --git a/site/docs/img/partition-spec-evolution.png b/site/docs/img/partition-spec-evolution.png
new file mode 100644
index 0000000..0bc595f
Binary files /dev/null and b/site/docs/img/partition-spec-evolution.png differ
diff --git a/site/docs/api-quickstart.md b/site/docs/java-api-quickstart.md
similarity index 66%
copy from site/docs/api-quickstart.md
copy to site/docs/java-api-quickstart.md
index 4e369d3..8bcf080 100644
--- a/site/docs/api-quickstart.md
+++ b/site/docs/java-api-quickstart.md
@@ -15,7 +15,7 @@
  - limitations under the License.
  -->
 
-# API Quickstart
+# Java API Quickstart
 
 ## Create a table
 
@@ -25,47 +25,41 @@ Tables are created using either a [`Catalog`](/javadoc/master/index.html?org/apa
 
 The Hive catalog connects to a Hive MetaStore to keep track of Iceberg tables. This example uses Spark's Hadoop configuration to get a Hive catalog:
 
-```scala
-import org.apache.iceberg.hive.HiveCatalog
+```java
+import org.apache.iceberg.hive.HiveCatalog;
 
-val catalog = new HiveCatalog(spark.sessionState.newHadoopConf())
+Catalog catalog = new HiveCatalog(spark.sparkContext().hadoopConfiguration());
 ```
 
 The `Catalog` interface defines methods for working with tables, like `createTable`, `loadTable`, `renameTable`, and `dropTable`.
 
 To create a table, pass an `Identifier` and a `Schema` along with other initial metadata:
 
-```scala
-val name = TableIdentifier.of("logging", "logs")
-val table = catalog.createTable(name, schema, spec)
+```java
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
 
-// write into the new logs table with Spark 2.4
-logsDF.write
-    .format("iceberg")
-    .mode("append")
-    .save("logging.logs")
+TableIdentifier name = TableIdentifier.of("logging", "logs");
+Table table = catalog.createTable(name, schema, spec);
 ```
 
 The logs [schema](#create-a-schema) and [partition spec](#create-a-partition-spec) are created below.
 
+
 ### Using Hadoop tables
 
 Iceberg also supports tables that are stored in a directory in HDFS or the local file system. Directory tables don't support all catalog operations, like rename, so they use the `Tables` interface instead of `Catalog`.
 
 To create a table in HDFS, use `HadoopTables`:
 
-```scala
-import org.apache.iceberg.hadoop.HadoopTables
-
-val tables = new HadoopTables(spark.sessionState.newHadoopConf())
-
-val table = tables.create(schema, spec, "hdfs:/tables/logging/logs")
+```java
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.Table;
 
-// write into the new logs table with Spark 2.4
-logsDF.write
-    .format("iceberg")
-    .mode("append")
-    .save("hdfs:/tables/logging/logs")
+Configuration conf = new Configuration():
+HadoopTables tables = new HadoopTables(conf);
+Table table = tables.create(schema, spec, table_location);
 ```
 
 !!! Warning
@@ -88,16 +82,16 @@ To read and write to tables from Spark see:
 
 This example creates a schema for a `logs` table:
 
-```scala
-import org.apache.iceberg.Schema
-import org.apache.iceberg.types.Types._
+```java
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.types.Types;
 
-val schema = new Schema(
-    NestedField.required(1, "level", StringType.get()),
-    NestedField.required(2, "event_time", TimestampType.withZone()),
-    NestedField.required(3, "message", StringType.get()),
-    NestedField.optional(4, "call_stack", ListType.ofRequired(5, StringType.get()))
-  )
+Schema schema = new Schema(
+      Types.NestedField.required(1, "level", Types.StringType.get()),
+      Types.NestedField.required(2, "event_time", Types.TimestampType.withZone()),
+      Types.NestedField.required(3, "message", Types.StringType.get()),
+      Types.NestedField.optional(4, "call_stack", Types.ListType.ofRequired(5, Types.StringType.get()))
+    );
 ```
 
 When using the Iceberg API directly, type IDs are required. Conversions from other schema formats, like Spark, Avro, and Parquet will automatically assign new IDs.
@@ -108,26 +102,25 @@ When a table is created, all IDs in the schema are re-assigned to ensure uniquen
 
 To create an Iceberg schema from an existing Avro schema, use converters in `AvroSchemaUtil`:
 
-```scala
-import org.apache.avro.Schema.Parser
-import org.apache.iceberg.avro.AvroSchemaUtil
+```java
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Parser;
+import org.apache.iceberg.avro.AvroSchemaUtil;
 
-val avroSchema = new Parser().parse("""{"type": "record", ... }""")
-
-val icebergSchema = AvroSchemaUtil.toIceberg(avroSchema)
+Schema avroSchema = new Parser().parse("{\"type\": \"record\" , ... }");
+Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema);
 ```
 
 ### Convert a schema from Spark
 
 To create an Iceberg schema from an existing table, use converters in `SparkSchemaUtil`:
 
-```scala
-import org.apache.iceberg.spark.SparkSchemaUtil
+```java
+import org.apache.iceberg.spark.SparkSchemaUtil;
 
-val schema = SparkSchemaUtil.convert(spark.table("db.table").schema)
+Schema schema = SparkSchemaUtil.schemaForTable(sparkSession, table_name);
 ```
 
-
 ## Partitioning
 
 ### Create a partition spec
@@ -136,11 +129,13 @@ Partition specs describe how Iceberg should group records into data files. Parti
 
 This example creates a partition spec for the `logs` table that partitions records by the hour of the log event's timestamp and by log level:
 
-```scala
-import org.apache.iceberg.PartitionSpec
+```java
+import org.apache.iceberg.PartitionSpec;
 
-val spec = PartitionSpec.builderFor(schema)
-                        .hour("event_time")
-                        .identity("level")
-                        .build()
+PartitionSpec spec = PartitionSpec.builderFor(schema)
+      .hour("event_time")
+      .identity("level")
+      .build();
 ```
+
+For more information on the different partition transforms that Iceberg offers, visit [this page](../spec#partitioning).
diff --git a/site/mkdocs.yml b/site/mkdocs.yml
index 9c4a6e6..68ad141 100644
--- a/site/mkdocs.yml
+++ b/site/mkdocs.yml
@@ -46,26 +46,27 @@ nav:
     - Disclaimer: disclaimer.md
     - How to Release: how-to-release.md
   - User docs:
+    - Getting Started: getting-started.md
+    - Configuration: configuration.md
     - Schemas: schemas.md
     - Partitioning: partitioning.md
-    - Configuration: configuration.md
     - Performance: performance.md
     - Reliability: reliability.md
     - Table evolution: evolution.md
-    - Time travel: spark#time-travel
-    - Quickstart: api-quickstart.md
+    - Time Travel: spark#time-travel
+    - Spark Quickstart: api-quickstart.md
     - Spark: spark.md
     - Presto: presto.md
   - Java:
     - Git Repo: https://github.com/apache/incubator-iceberg
-    - Quickstart: api-quickstart.md
+    - Quickstart: java-api-quickstart.md
     - API intro: api.md
     - Javadoc: /javadoc/
     - Custom Catalog: custom-catalog.md
   - Python:
     - Git Repo: https://github.com/apache/incubator-iceberg/tree/master/python
     - Quickstart: python-quickstart.md
-    - API intro: python-api-intro.md
+    - API Intro: python-api-intro.md
     - Feature Support: python-feature-support.md
   - Format:
     - Definitions: terms.md
diff --git a/spark/src/test/java/org/apache/iceberg/examples/ConcurrencyTest.java b/spark/src/test/java/org/apache/iceberg/examples/ConcurrencyTest.java
new file mode 100644
index 0000000..4c1530c
--- /dev/null
+++ b/spark/src/test/java/org/apache/iceberg/examples/ConcurrencyTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.examples;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.commons.io.FileUtils;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+/**
+ * This class tests how Iceberg handles concurrency when reading and writing at the same time
+ */
+public class ConcurrencyTest {
+
+  private static final Logger log = LoggerFactory.getLogger(ConcurrencyTest.class);
+
+  private Schema schema = new Schema(
+      optional(1, "key", Types.LongType.get()),
+      optional(2, "value", Types.StringType.get())
+  );
+  private SparkSession spark;
+  private File tableLocation;
+  private Table table;
+
+  private List<SimpleRecord> data = new ArrayList<>();
+
+  @Before
+  public void before() throws IOException {
+    tableLocation = Files.createTempDirectory("temp").toFile();
+
+    spark = SparkSession.builder().master("local[2]").getOrCreate();
+    spark.sparkContext().setLogLevel("WARN");
+
+    HadoopTables tables = new HadoopTables(spark.sparkContext().hadoopConfiguration());
+    table = tables.create(schema, tableLocation.toString());
+
+    for (int i = 0; i < 1000000; i++) {
+      data.add(new SimpleRecord(1, "bdp"));
+    }
+
+    log.info("End of setup phase");
+  }
+
+  /**
+   * The test creates 500 read tasks and one really long write (writing 1 mil rows)
+   * and uses threading to call the tasks concurrently.
+   */
+  @Test
+  public void writingAndReadingConcurrently() throws InterruptedException {
+    ExecutorService threadPool = Executors.newFixedThreadPool(5);
+    List<Callable<Void>> tasks = new ArrayList<>();
+
+    Callable<Void> write = () -> writeToTable(data);
+    tasks.add(write);
+
+    for (int i = 0; i < 500; i++) {
+      Callable<Void> getReads = () -> readTable();
+      tasks.add(getReads);
+    }
+
+    threadPool.invokeAll(tasks);
+    threadPool.shutdown();
+
+    table.refresh();
+    readTable();
+  }
+
+  private Void readTable() {
+    Dataset<Row> results = spark.read()
+        .format("iceberg")
+        .load(tableLocation.toString());
+
+    log.info("" + results.count());
+    return null;
+  }
+  private Void writeToTable(List<SimpleRecord> writeData) {
+    log.info("WRITING!");
+    Dataset<Row> df = spark.createDataFrame(writeData, SimpleRecord.class);
+    df.select("key",  "value").write()
+        .format("iceberg")
+        .mode("append")
+        .save(tableLocation.toString());
+    return null;
+  }
+
+  @After
+  public void after() throws IOException {
+    spark.stop();
+    FileUtils.deleteDirectory(tableLocation);
+  }
+}
diff --git a/spark/src/test/java/org/apache/iceberg/examples/README.md b/spark/src/test/java/org/apache/iceberg/examples/README.md
new file mode 100644
index 0000000..6bab285
--- /dev/null
+++ b/spark/src/test/java/org/apache/iceberg/examples/README.md
@@ -0,0 +1,194 @@
+# Iceberg Java API Examples (with Spark)
+
+## About
+Welcome! :smile:
+
+If you've stumbled across this module, hopefully you're looking for some guidance on how to get started with the [Apache Iceberg](https://iceberg.apache.org/) table format. This set of classes collects code examples of how to use the Iceberg Java API with Spark, along with some extra detail here in the README.
+
+The examples are structured as JUnit tests that you can download and run locally if you want to mess around with Iceberg yourself. 
+
+## Using Iceberg 
+### Maven
+If you'd like to try out Iceberg in your own project using Spark, you can use the `iceberg-spark-runtime` dependency:
+```xml
+   <dependency>
+     <groupId>org.apache.iceberg</groupId>
+     <artifactId>iceberg-spark-runtime</artifactId>
+     <version>${iceberg.version}</version>
+   </dependency>
+```
+
+You'll also need `spark-sql`:
+```xml
+  <dependency> 
+    <groupId>org.apache.spark</groupId>
+    <artifactId>spark-sql_2.11</artifactId>
+    <version>2.4.4</version>
+  </dependency>
+```
+
+### Gradle
+To add a dependency on Iceberg in Gradle, add the following to `build.gradle`:
+```
+dependencies {
+  compile 'org.apache.iceberg:iceberg-core:0.7.0-incubating'
+}
+```
+
+## Key features investigated
+The following section will break down the different areas of Iceberg explored in the examples, with links to the code and extra information that could be useful for new users. 
+
+### Writing data to tables
+There are multiple ways of creating tables with Iceberg, including using the Hive Metastore to keep track of tables ([HiveCatalog](https://iceberg.apache.org/api-quickstart/#using-a-hive-catalog)), or using HDFS / your local file system ([HadoopTables](https://iceberg.incubator.apache.org/api-quickstart/#using-hadoop-tables)) to store the tables. However, it should be noted that directory tables (such as those using `HadoopTables`)  don’t support all catalog operations, like rename and t [...]
+It should be noted that `HadoopTables` _shouldn’t_ be used with file systems that do not support atomic rename as Iceberg depends on this to synchronize concurrent commits. 
+To limit complexity, these examples create tables on your local file system using the `HadoopTables` class.
+
+To create an Iceberg `Table` you will need to use the Iceberg API to create a `Schema` and `PartitionSpec` which you use with a Spark `DataFrameWriter`.
+
+Code examples can be found [here](ReadAndWriteTablesTest.java).
+
+#### A quick look at file structures
+It could be interesting to note that when writing partitioned data, Iceberg will layout your files in a similar manner to Hive:
+
+``` 
+├── data
+│   ├── published_month=2017-09
+│   │   └── 00000-1-5cbc72f6-7c1a-45e4-bb26-bc30deaca247-00002.parquet
+│   ├── published_month=2018-09
+│   │   └── 00000-1-5cbc72f6-7c1a-45e4-bb26-bc30deaca247-00001.parquet
+│   ├── published_month=2018-11
+│   │   └── 00000-1-5cbc72f6-7c1a-45e4-bb26-bc30deaca247-00000.parquet
+│   └── published_month=null
+│       └── 00000-1-5cbc72f6-7c1a-45e4-bb26-bc30deaca247-00003.parquet
+└── metadata
+    └── version-hint.text
+```
+**WARNING** 
+It should be noted that it is not possible to just drag-and-drop data files into an Iceberg table like the one shown above and expect to see your data in the table. 
+Each file is tracked individually and is managed by Iceberg, and so must be written into the table using the Iceberg API. 
+
+### Reading data from tables
+Reading Iceberg tables is fairly simple using the Spark `DataFrameReader`.
+
+Code examples can be found [here](ReadAndWriteTablesTest.java).
+
+### A look at the metadata
+This section looks a little bit closer at the metadata produced by Iceberg tables. Consider an example where you've written some data to a table. Your files will look something like this:
+
+``` 
+├── data
+│   └── ...
+└── metadata
+    ├── 51accd1d-39c7-4a6e-8f35-9e05f7c67864-m0.avro
+    ├── snap-1335014336004891572-1-51accd1d-39c7-4a6e-8f35-9e05f7c67864.avro
+    ├── v1.metadata.json
+    ├── v2.metadata.json
+    └── version-hint.text
+```
+
+The metadata for your table is kept in json files and each commit to a table will produce a new metadata file. For tables using a metastore for the metadata, the file used is whichever file the metastore points at. For `HadoopTables`, the file used will be the latest version available. Look [here](https://iceberg.incubator.apache.org/spec/#table-metadata) for more information on metadata.
+
+The metadata file will contain things like the table location, the schema and the partition spec:
+
+```json
+{
+  "format-version" : 1,
+  "table-uuid" : "f31aa6d7-acc3-4365-b737-4ef028a60bc1",
+  "location" : "/var/folders/sg/ypkyhl2s0p18qcd10ddpkn0c0000gn/T/temp5216691795982307214",
+  "last-updated-ms" : 1572972868185,
+  "last-column-id" : 2,
+  "schema" : {
+    "type" : "struct",
+    "fields" : [ {
+      ...
+    } ]
+  },
+  "partition-spec" : [ {
+    ...
+  } ],
+  "default-spec-id" : 0,
+  "partition-specs" : [ {
+    ...
+    } ]
+  } ],
+  "properties" : { },
+  "current-snapshot-id" : -1,
+  "snapshots" : [ ],
+  "snapshot-log" : [ ]
+}
+```
+
+When you then add your first chunk of data, you get a new version of the metadata (`v2.metadata.json`) that is the same as the first version except for the snapshot section at the bottom, which gets updated to:
+
+```json
+"current-snapshot-id" : 8405273199394950821,
+  "snapshots" : [ {
+    "snapshot-id" : 8405273199394950821,
+    "timestamp-ms" : 1572972873293,
+    "summary" : {
+      "operation" : "append",
+      "spark.app.id" : "local-1572972867758",
+      "added-data-files" : "4",
+      "added-records" : "4",
+      "changed-partition-count" : "4",
+      "total-records" : "4",
+      "total-data-files" : "4"
+    },
+    "manifest-list" : "/var/folders/sg/ypkyhl2s0p18qcd10ddpkn0c0000gn/T/temp5216691795982307214/metadata/snap-8405273199394950821-1-5706fc75-31e1-404e-aa23-b493387e2e32.avro"
+  } ],
+  "snapshot-log" : [ {
+    "timestamp-ms" : 1572972873293,
+    "snapshot-id" : 8405273199394950821
+  } ]
+```
+
+Here you get information on the data you have just written to the table, such as `added-records` and `added-data-files` as well as where the manifest list is located. 
+
+
+### Snapshot based functionality
+Iceberg uses [snapshots](https://iceberg.apache.org/terms/#snapshot) as part of its implementation, and provides a lot of useful functionality from this, such as **time travel**.
+
+- Iceberg creates a new snapshot for all table operations that modify the table, such as appends and overwrites.
+- You are able to access the whole list of snapshots generated for a table.
+- Iceberg will store all snapshots generated until you delete the snapshots using the `ExpireSnapshots` API. Currently, this must be called by the user.
+    - **NOTE**: A VACUUM operation with Spark is in the works for a future release to make this process easier. 
+    - You can delete all snapshots earlier than a certain timestamp.
+    - You can delete snaphots based on `SnapshotID` values.
+- You can read data from an old snapshot using the `SnapshotID` or a timestamp value ([time travel](https://iceberg.apache.org/spark/#time-travel)).
+- You can roll back your data to an earlier snapshot.
+
+Code examples can be found [here](SnapshotFunctionalityTest.java).
+
+### Table schema evolution
+Iceberg provides support to handle schema evolution of your tables over time:
+
+1. Add a new column
+    1. The new column is always added at the end of the table (**NOTE**: This will be fixed with Spark 3.0 which has implemented AFTER and FIRST operations).
+    1. You are only able to add a column at the end of the schema, not somewhere in the middle. 
+    1. Any rows using the earlier schema return a `null` value for this new column. You cannot use an alternative default value.
+    1. This column automatically becomes an `optional` column, meaning adding data to this column isn't enforced for each future write. 
+1. Delete a column
+    1. When you delete a column, that column will no longer be available in any of your previous snapshots. So, use this with caution :sweat_smile: 
+1. Update a column
+    1. Certain type promotions can be made (such as `int` -> `long`). For a definitive list, see the [official documentation](https://iceberg.apache.org/spec/#schemas-and-data-types).
+1. Rename a column
+    1. When you rename a column, it will appear renamed in all earlier versions of snapshots. 
+
+Code examples can be found [here](SchemaEvolutionTest.java).
+
+### Optimistic concurrency
+[Optimistic concurrency](https://en.wikipedia.org/wiki/Optimistic_concurrency_control) is when a system assumes that multiple writers can write to the same table without interfering with each other. This is usually used in environments where there is low data contention. It means that locking of the table isn't used, allowing multiple writers to  write to the table at the same time. 
+
+However, this means you need to occasionally deal with concurrent writer conflicts. This is when multiple writers start writing to a table at the same time, but one finishes first and commits an update. Then when the second writer tries to commit it has to throw an error because the table isn't in the same state as it was when it started writing.
+
+Iceberg deals with this by attempting retries of the write based on the new metadata. This can happen if the files the first write changed aren't touched by the second write, then it's deemed safe to commit the second update. 
+
+[This test](ConcurrencyTest.java) looks to experiment with how optimistic concurrency works. For more information on conflict resolution, look [here](https://iceberg.incubator.apache.org/spec/#table-metadata) and for information on write concurrency, look [here](https://iceberg.incubator.apache.org/reliability/#concurrent-write-operations).
+
+By default, Iceberg has set the `commit.retry.num-retries` property to **4**. You can edit this default by creating an `UpdateProperties` object and assigning a new number to that property:
+
+```java
+  table.updateProperties().set("commit.retry.num-retries", "1").commit();
+```
+
+You can find more information on other table properties you can configure [here](https://iceberg.incubator.apache.org/configuration/#table-properties).
diff --git a/spark/src/test/java/org/apache/iceberg/examples/ReadAndWriteTablesTest.java b/spark/src/test/java/org/apache/iceberg/examples/ReadAndWriteTablesTest.java
new file mode 100644
index 0000000..0f49574
--- /dev/null
+++ b/spark/src/test/java/org/apache/iceberg/examples/ReadAndWriteTablesTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.examples;
+
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.List;
+import org.apache.commons.io.FileUtils;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.DataTypes;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+/**
+ * This test class uses Spark to create partitioned and unpartitioned tables locally.
+ */
+public class ReadAndWriteTablesTest {
+
+  private SparkSession spark;
+  private Table table;
+  private HadoopTables tables;
+  private File pathToTable;
+  private Schema schema;
+
+  @Before
+  public void before() throws IOException {
+    spark = SparkSession.builder().master("local[2]").getOrCreate();
+
+    pathToTable = Files.createTempDirectory("temp").toFile();
+    tables = new HadoopTables(spark.sparkContext().hadoopConfiguration());
+
+    schema = new Schema(
+        optional(1, "id", Types.IntegerType.get()),
+        optional(2, "data", Types.StringType.get())
+    );
+  }
+
+  @Test
+  public void createUnpartitionedTable() {
+    table = tables.create(schema, pathToTable.toString());
+
+    List<SimpleRecord> expected = Lists.newArrayList(
+        new SimpleRecord(1, "a"),
+        new SimpleRecord(2, "b"),
+        new SimpleRecord(3, "c")
+    );
+
+    Dataset<Row> df = spark.createDataFrame(expected, SimpleRecord.class);
+
+    df.select("id", "data").write()
+        .format("iceberg")
+        .mode("append")
+        .save(pathToTable.toString());
+
+    table.refresh();
+  }
+
+  @Test
+  public void createPartitionedTable() {
+    PartitionSpec spec = PartitionSpec.builderFor(schema)
+        .identity("id")
+        .build();
+
+    table = tables.create(schema, spec, pathToTable.toString());
+
+    List<SimpleRecord> expected = Lists.newArrayList(
+        new SimpleRecord(1, "a"),
+        new SimpleRecord(2, "b"),
+        new SimpleRecord(3, "c")
+    );
+
+    Dataset<Row> df = spark.createDataFrame(expected, SimpleRecord.class);
+
+    df.select("id", "data").write()
+        .format("iceberg")
+        .mode("append")
+        .save(pathToTable.toString());
+
+    table.refresh();
+  }
+
+  @Test
+  public void writeDataFromJsonFile() {
+    Schema bookSchema = new Schema(
+        optional(1, "title", Types.StringType.get()),
+        optional(2, "price", Types.LongType.get()),
+        optional(3, "author", Types.StringType.get()),
+        optional(4, "published", Types.TimestampType.withZone()),
+        optional(5, "genre", Types.StringType.get())
+    );
+
+    table = tables.create(bookSchema, pathToTable.toString());
+
+    Dataset<Row> df = spark.read().json("src/test/resources/data/books.json");
+
+    df.select(df.col("title"), df.col("price"), df.col("author"),
+        df.col("published").cast(DataTypes.TimestampType), df.col("genre")).write()
+        .format("iceberg")
+        .mode("append")
+        .save(pathToTable.toString());
+
+    table.refresh();
+  }
+
+  @Test
+  public void readFromIcebergTableWithSpark() {
+    table = tables.create(schema, pathToTable.toString());
+
+    Dataset<Row> results = spark.read()
+        .format("iceberg")
+        .load(pathToTable.toString());
+
+    results.createOrReplaceTempView("table");
+    spark.sql("select * from table").show();
+  }
+
+  @Test
+  public void readFromPartitionedTableWithFilter() {
+    table = tables.create(schema, pathToTable.toString());
+
+    Dataset<Row> results = spark.read()
+        .format("iceberg")
+        .load(pathToTable.toString())
+        .filter("data != \"b\"");
+
+    results.createOrReplaceTempView("table");
+    spark.sql("SELECT * FROM table").show();
+  }
+
+  @After
+  public void after() throws IOException {
+    FileUtils.deleteDirectory(pathToTable);
+    spark.stop();
+  }
+}
diff --git a/spark/src/test/java/org/apache/iceberg/examples/SchemaEvolutionTest.java b/spark/src/test/java/org/apache/iceberg/examples/SchemaEvolutionTest.java
new file mode 100644
index 0000000..f689f53
--- /dev/null
+++ b/spark/src/test/java/org/apache/iceberg/examples/SchemaEvolutionTest.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.examples;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import org.apache.commons.io.FileUtils;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.DataTypes;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+/**
+ * This class tests how you can evolve your table schema with Iceberg.
+ * This includes things like adding, deleting, renaming columns and type promotions.
+ */
+public class SchemaEvolutionTest {
+
+  private static final Logger log = LoggerFactory.getLogger(SchemaEvolutionTest.class);
+
+  private SparkSession spark;
+  private Table table;
+  private File tableLocation;
+  private String dataLocation = "src/test/resources/data/";
+
+  @Before
+  public void before() throws IOException {
+    spark = SparkSession.builder().master("local[2]").getOrCreate();
+    tableLocation = Files.createTempDirectory("temp").toFile();
+    Schema schema = new Schema(
+        optional(1, "title", Types.StringType.get()),
+        optional(2, "price", Types.IntegerType.get()),
+        optional(3, "author", Types.StringType.get()),
+        optional(4, "published", Types.TimestampType.withZone()),
+        optional(5, "genre", Types.StringType.get())
+    );
+    PartitionSpec spec = PartitionSpec.builderFor(schema)
+        .year("published")
+        .build();
+
+    HadoopTables tables = new HadoopTables(spark.sparkContext().hadoopConfiguration());
+    table = tables.create(schema, spec, tableLocation.toString());
+
+    Dataset<Row> df = spark.read().json(dataLocation + "/books.json");
+
+    df.select(df.col("title"), df.col("price").cast(DataTypes.IntegerType),
+        df.col("author"), df.col("published").cast(DataTypes.TimestampType),
+        df.col("genre")).write()
+        .format("iceberg")
+        .mode("append")
+        .save(tableLocation.toString());
+
+    table.refresh();
+  }
+
+  @Test
+  public void addColumnToSchema() {
+    table.updateSchema().addColumn("publisher", Types.StringType.get()).commit();
+
+    Dataset<Row> df2 = spark.read().json(dataLocation + "new-books.json");
+
+    df2.select(df2.col("title"), df2.col("price").cast(DataTypes.IntegerType),
+        df2.col("author"), df2.col("published").cast(DataTypes.TimestampType),
+        df2.col("genre"), df2.col("publisher")).write()
+        .format("iceberg")
+        .mode("append")
+        .save(tableLocation.toString());
+  }
+
+  @Test
+  public void deleteColumnFromSchema() {
+    table.updateSchema().deleteColumn("genre").commit();
+
+    table.refresh();
+    Dataset<Row> results = spark.read()
+        .format("iceberg")
+        .load(tableLocation.toString());
+
+    results.createOrReplaceTempView("table");
+    spark.sql("select * from table").show();
+  }
+
+  @Test
+  public void renameColumn() {
+    table.updateSchema().renameColumn("author", "writer").commit();
+
+    table.refresh();
+    Dataset<Row> results = spark.read()
+        .format("iceberg")
+        .load(tableLocation.toString());
+
+    results.createOrReplaceTempView("table");
+    spark.sql("select * from table").show();
+  }
+
+  @Test
+  public void updateColumnTypeIntToLong() {
+    table.updateSchema().updateColumn("price", Types.LongType.get()).commit();
+
+    log.info("Promote int type to long type:\n" + table.schema().toString());
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void updateColumnTypeIntToString() {
+    table.updateSchema().updateColumn("price", Types.StringType.get()).commit();
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void updateColumnTypeStringToInt() {
+    table.updateSchema().updateColumn("author", Types.IntegerType.get()).commit();
+  }
+
+  @Test
+  public void floatToDouble() throws IOException {
+    // Set up a new table to test this conversion
+    Schema schema = new Schema(optional(1, "float", Types.FloatType.get()));
+    File location = Files.createTempDirectory("temp").toFile();
+    HadoopTables tables = new HadoopTables(spark.sparkContext().hadoopConfiguration());
+    Table floatTable = tables.create(schema, location.toString());
+
+    floatTable.updateSchema().updateColumn("float", Types.DoubleType.get()).commit();
+
+    log.info("Promote float type to double type:\n" + floatTable.schema().toString());
+  }
+
+  @Test
+  public void widenDecimalPrecision() throws IOException {
+    // Set up a new table to test this conversion
+    Schema schema = new Schema(optional(1, "decimal", Types.DecimalType.of(2, 2)));
+    File location = Files.createTempDirectory("temp").toFile();
+    HadoopTables tables = new HadoopTables(spark.sparkContext().hadoopConfiguration());
+    Table decimalTable = tables.create(schema, location.toString());
+
+    decimalTable.updateSchema().updateColumn("decimal", Types.DecimalType.of(4, 2)).commit();
+
+    log.info("Widen decimal type:\n" + decimalTable.schema().toString());
+  }
+
+  @Test
+  public void after() throws IOException {
+    spark.stop();
+    FileUtils.deleteDirectory(tableLocation);
+  }
+}
diff --git a/spark/src/test/java/org/apache/iceberg/examples/SimpleRecord.java b/spark/src/test/java/org/apache/iceberg/examples/SimpleRecord.java
new file mode 100644
index 0000000..f7394f3
--- /dev/null
+++ b/spark/src/test/java/org/apache/iceberg/examples/SimpleRecord.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.examples;
+
+import com.google.common.base.Objects;
+
+public class SimpleRecord {
+  private Integer id;
+  private String data;
+
+  public SimpleRecord() {
+  }
+
+  SimpleRecord(Integer id, String data) {
+    this.id = id;
+    this.data = data;
+  }
+
+  public Integer getId() {
+    return id;
+  }
+
+  public void setId(Integer id) {
+    this.id = id;
+  }
+
+  public String getData() {
+    return data;
+  }
+
+  public void setData(String data) {
+    this.data = data;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    SimpleRecord record = (SimpleRecord) o;
+    return Objects.equal(id, record.id) && Objects.equal(data, record.data);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(id, data);
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder buffer = new StringBuilder();
+    buffer.append("{\"id\"=");
+    buffer.append(id);
+    buffer.append(",\"data\"=\"");
+    buffer.append(data);
+    buffer.append("\"}");
+    return buffer.toString();
+  }
+}
diff --git a/spark/src/test/java/org/apache/iceberg/examples/SnapshotFunctionalityTest.java b/spark/src/test/java/org/apache/iceberg/examples/SnapshotFunctionalityTest.java
new file mode 100644
index 0000000..4649d9a
--- /dev/null
+++ b/spark/src/test/java/org/apache/iceberg/examples/SnapshotFunctionalityTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.examples;
+
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.commons.collections.IteratorUtils;
+import org.apache.commons.io.FileUtils;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+/**
+ * This class tests the snapshot functionality available with Iceberg.
+ * This includes things like time-travel, rollback and retrieving metadata.
+ */
+public class SnapshotFunctionalityTest {
+
+  private static final Logger log = LoggerFactory.getLogger(SnapshotFunctionalityTest.class);
+
+  private Table table;
+  private File tableLocation;
+  private SparkSession spark = null;
+
+  @Before
+  public void before() throws IOException {
+    Schema schema = new Schema(
+        optional(1, "id", Types.IntegerType.get()),
+        optional(2, "data", Types.StringType.get())
+    );
+
+    spark = SparkSession.builder().master("local[2]").getOrCreate();
+
+    tableLocation = Files.createTempDirectory("temp").toFile();
+
+    HadoopTables tables = new HadoopTables(spark.sparkContext().hadoopConfiguration());
+    PartitionSpec spec = PartitionSpec.unpartitioned();
+    table = tables.create(schema, spec, tableLocation.toString());
+
+    List<SimpleRecord> expected = Lists.newArrayList(
+        new SimpleRecord(1, "a"),
+        new SimpleRecord(2, "b"),
+        new SimpleRecord(3, "c")
+    );
+
+    Dataset<Row> df = spark.createDataFrame(expected, SimpleRecord.class);
+
+    for (int i = 0; i < 5; i++) {
+      df.select("id", "data").write()
+          .format("iceberg")
+          .mode("append")
+          .save(tableLocation.toString());
+    }
+    table.refresh();
+  }
+
+  @Test
+  public void rollbackToPreviousSnapshotAndReadData() {
+    long oldId = table.history().get(0).snapshotId();
+
+    table.rollback().toSnapshotId(oldId).commit();
+    table.refresh();
+
+    Dataset<Row> results = spark.read()
+        .format("iceberg")
+        .load(tableLocation.toString());
+
+    results.createOrReplaceTempView("table");
+    spark.sql("select * from table").show();
+  }
+
+  @Test
+  public void expireOldSnapshotWithSnapshotID() {
+    long oldId = table.history().get(0).snapshotId();
+
+    table.expireSnapshots().expireSnapshotId(oldId).commit();
+    table.refresh();
+
+    Iterator<Snapshot> iterator = table.snapshots().iterator();
+    List<Snapshot> snapshots = IteratorUtils.toList(iterator);
+  }
+
+  /**
+   * Expires anything older than a given timestamp, NOT including that timestamp.
+   */
+  @Test
+  public void retireAllSnapshotsOlderThanTimestamp() {
+    long secondLatestTimestamp = table.history().get(2).timestampMillis();
+    Iterator<Snapshot> beforeIterator = table.snapshots().iterator();
+    List<Snapshot> beforeSnapshots = IteratorUtils.toList(beforeIterator);
+
+    //Delete the 2 oldest snapshots
+    table.expireSnapshots().expireOlderThan(secondLatestTimestamp).commit();
+    table.refresh();
+
+    Iterator<Snapshot> afterIterator = table.snapshots().iterator();
+    List<Snapshot> afterSnapshots = IteratorUtils.toList(afterIterator);
+  }
+
+  @Test
+  public void getInfoAboutFilesAddedFromSnapshot() {
+    Snapshot snapshot = table.currentSnapshot();
+    Iterable<DataFile> addedFiles = snapshot.addedFiles();
+
+    for (DataFile dataFile : addedFiles) {
+      log.info("File path: " + dataFile.path());
+      log.info("File format: " + dataFile.format());
+      log.info("File size in bytes: " + dataFile.fileSizeInBytes());
+      log.info("Record count: " + dataFile.recordCount());
+    }
+  }
+
+  @After
+  public void after() throws IOException {
+    FileUtils.deleteDirectory(tableLocation);
+    spark.stop();
+  }
+}
diff --git a/spark/src/test/resources/data/books.json b/spark/src/test/resources/data/books.json
new file mode 100644
index 0000000..902b4e3
--- /dev/null
+++ b/spark/src/test/resources/data/books.json
@@ -0,0 +1,6 @@
+{"title":"Gone", "price":12, "author": "Michael Grant", "published": 1541776051, "genre": "fiction"}
+{"title":"Carry On", "price":10, "author": "Rainbow Rowell", "published": 1536505651, "genre": "fiction"}
+{"title":"Warward Son", "price":12, "author": "Rainbow Rowell", "published": 1504969651, "genre": "fiction"}
+{"title":"Heroes", "price":8, "author": "Stephen Fry", "published": 1504969651, "genre": "fiction"}
+{"title":"Vietnam", "price":15, "author": "Max Hastings", "genre": "non-fiction"}
+
diff --git a/spark/src/test/resources/data/new-books.json b/spark/src/test/resources/data/new-books.json
new file mode 100644
index 0000000..3418151
--- /dev/null
+++ b/spark/src/test/resources/data/new-books.json
@@ -0,0 +1,4 @@
+{"title":"Harry Potter", "price":12, "author": "JK Rowling", "published": 1570719361, "genre": "fiction", "publisher": "ACME Books"}
+{"title":"Percy Jackson", "price":10, "author": "Rick Riordan", "published": 1547132161, "genre": "fiction", "publisher": "ACME Books"}
+{"title":"Cookie", "price":8, "author": "Jacqueline Wilson", "published": 1552229761, "genre": "fiction", "publisher": "ACME Books"}
+{"title":"Fangirl", "price":12, "author": "Rainbow Rowell", "published": 1552229761, "genre": "fiction", "publisher": "ACME Books"}