You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2014/10/23 20:47:53 UTC

svn commit: r1633917 [2/2] - in /hive/branches/spark: ./ spark-client/ spark-client/src/ spark-client/src/main/ spark-client/src/main/java/ spark-client/src/main/java/org/ spark-client/src/main/java/org/apache/ spark-client/src/main/java/org/apache/hiv...

Added: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleReadMetrics.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleReadMetrics.java?rev=1633917&view=auto
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleReadMetrics.java (added)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleReadMetrics.java Thu Oct 23 18:47:52 2014
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.spark.client.metrics;
+
+import java.io.Serializable;
+
+import org.apache.spark.executor.TaskMetrics;
+
+/**
+ * Metrics pertaining to reading shuffle data.
+ */
+public class ShuffleReadMetrics implements Serializable {
+
+  /** Number of remote blocks fetched in shuffles by tasks. */
+  public final int remoteBlocksFetched;
+  /** Number of local blocks fetched in shuffles by tasks. */
+  public final int localBlocksFetched;
+  /**
+   * Time tasks spent waiting for remote shuffle blocks. This only includes the
+   * time blocking on shuffle input data. For instance if block B is being
+   * fetched while the task is still not finished processing block A, it is not
+   * considered to be blocking on block B.
+   */
+  public final long fetchWaitTime;
+  /** Total number of remote bytes read from the shuffle by tasks. */
+  public final long remoteBytesRead;
+
+  public ShuffleReadMetrics(
+      int remoteBlocksFetched,
+      int localBlocksFetched,
+      long fetchWaitTime,
+      long remoteBytesRead) {
+    this.remoteBlocksFetched = remoteBlocksFetched;
+    this.localBlocksFetched = localBlocksFetched;
+    this.fetchWaitTime = fetchWaitTime;
+    this.remoteBytesRead = remoteBytesRead;
+  }
+
+  public ShuffleReadMetrics(TaskMetrics metrics) {
+    this(metrics.shuffleReadMetrics().get().remoteBlocksFetched(),
+      metrics.shuffleReadMetrics().get().localBlocksFetched(),
+      metrics.shuffleReadMetrics().get().fetchWaitTime(),
+      metrics.shuffleReadMetrics().get().remoteBytesRead());
+  }
+
+  /**
+   * Number of blocks fetched in shuffle by tasks (remote or local).
+   */
+  public int getTotalBlocksFetched() {
+    return remoteBlocksFetched + localBlocksFetched;
+  }
+
+}

Added: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleWriteMetrics.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleWriteMetrics.java?rev=1633917&view=auto
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleWriteMetrics.java (added)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleWriteMetrics.java Thu Oct 23 18:47:52 2014
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.spark.client.metrics;
+
+import java.io.Serializable;
+
+import org.apache.spark.executor.TaskMetrics;
+
+/**
+ * Metrics pertaining to writing shuffle data.
+ */
+public class ShuffleWriteMetrics implements Serializable {
+
+  /** Number of bytes written for the shuffle by tasks. */
+  public final long shuffleBytesWritten;
+  /** Time tasks spent blocking on writes to disk or buffer cache, in nanoseconds. */
+  public final long shuffleWriteTime;
+
+  public ShuffleWriteMetrics(
+      long shuffleBytesWritten,
+      long shuffleWriteTime) {
+    this.shuffleBytesWritten = shuffleBytesWritten;
+    this.shuffleWriteTime = shuffleWriteTime;
+  }
+
+  public ShuffleWriteMetrics(TaskMetrics metrics) {
+    this(metrics.shuffleWriteMetrics().get().shuffleBytesWritten(),
+      metrics.shuffleWriteMetrics().get().shuffleWriteTime());
+  }
+
+}

Added: hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java?rev=1633917&view=auto
==============================================================================
--- hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java (added)
+++ hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java Thu Oct 23 18:47:52 2014
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.spark.client;
+
+import java.util.Arrays;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+import org.apache.hive.spark.client.metrics.*;
+
+public class TestMetricsCollection {
+
+  @Test
+  public void testMetricsAggregation() {
+    MetricsCollection collection = new MetricsCollection();
+    // 2 jobs, 2 stages per job, 2 tasks per stage.
+    for (int i : Arrays.asList(1, 2)) {
+      for (int j : Arrays.asList(1, 2)) {
+        for (long k : Arrays.asList(1L, 2L)) {
+          collection.addMetrics(i, j, k, makeMetrics(i, j, k));
+        }
+      }
+    }
+
+    assertEquals(ImmutableSet.of(1, 2), collection.getJobIds());
+    assertEquals(ImmutableSet.of(1, 2), collection.getStageIds(1));
+    assertEquals(ImmutableSet.of(1L, 2L), collection.getTaskIds(1, 1));
+
+    Metrics task112 = collection.getTaskMetrics(1, 1, 2);
+    checkMetrics(task112, taskValue(1, 1, 2));
+
+    Metrics stage21 = collection.getStageMetrics(2, 1);
+    checkMetrics(stage21, stageValue(2, 1, 2));
+
+    Metrics job1 = collection.getJobMetrics(1);
+    checkMetrics(job1, jobValue(1, 2, 2));
+
+    Metrics global = collection.getAllMetrics();
+    checkMetrics(global, globalValue(2, 2, 2));
+  }
+
+  @Test
+  public void testOptionalMetrics() {
+    long value = taskValue(1, 1, 1L);
+    Metrics metrics = new Metrics(value, value, value, value, value, value, value,
+        Optional.<InputMetrics>absent(),
+        Optional.<ShuffleReadMetrics>absent(),
+        Optional.<ShuffleWriteMetrics>absent());
+
+    MetricsCollection collection = new MetricsCollection();
+    for (int i : Arrays.asList(1, 2)) {
+      collection.addMetrics(i, 1, 1, metrics);
+    }
+
+    Metrics global = collection.getAllMetrics();
+    assertFalse(global.inputMetrics.isPresent());
+    assertFalse(global.shuffleReadMetrics.isPresent());
+    assertFalse(global.shuffleWriteMetrics.isPresent());
+
+    collection.addMetrics(3, 1, 1, makeMetrics(3, 1, 1));
+
+    Metrics global2 = collection.getAllMetrics();
+    assertTrue(global2.inputMetrics.isPresent());
+    assertEquals(taskValue(3, 1, 1), global2.inputMetrics.get().bytesRead);
+
+    assertTrue(global2.shuffleReadMetrics.isPresent());
+    assertTrue(global2.shuffleWriteMetrics.isPresent());
+  }
+
+  @Test
+  public void testInputReadMethodAggregation() {
+    MetricsCollection collection = new MetricsCollection();
+
+    long value = taskValue(1, 1, 1);
+    Metrics metrics1 = new Metrics(value, value, value, value, value, value, value,
+      Optional.fromNullable(new InputMetrics(DataReadMethod.Memory, value)),
+      Optional.<ShuffleReadMetrics>absent(),
+      Optional.<ShuffleWriteMetrics>absent());
+    Metrics metrics2 = new Metrics(value, value, value, value, value, value, value,
+      Optional.fromNullable(new InputMetrics(DataReadMethod.Disk, value)),
+      Optional.<ShuffleReadMetrics>absent(),
+      Optional.<ShuffleWriteMetrics>absent());
+
+    collection.addMetrics(1, 1, 1, metrics1);
+    collection.addMetrics(1, 1, 2, metrics2);
+
+    Metrics global = collection.getAllMetrics();
+    assertTrue(global.inputMetrics.isPresent());
+    assertEquals(DataReadMethod.Multiple, global.inputMetrics.get().readMethod);
+  }
+
+  private Metrics makeMetrics(int jobId, int stageId, long taskId) {
+    long value = 1000000 * jobId + 1000 * stageId + taskId;
+    return new Metrics(value, value, value, value, value, value, value,
+      Optional.fromNullable(new InputMetrics(DataReadMethod.Memory, value)),
+      Optional.fromNullable(new ShuffleReadMetrics((int) value, (int) value, value, value)),
+      Optional.fromNullable(new ShuffleWriteMetrics(value, value)));
+  }
+
+  /**
+   * The metric values will all be the same. This makes it easy to calculate the aggregated values
+   * of jobs and stages without fancy math.
+   */
+  private long taskValue(int jobId, int stageId, long taskId) {
+    return 1000000 * jobId + 1000 * stageId + taskId;
+  }
+
+  private long stageValue(int jobId, int stageId, int taskCount) {
+    long value = 0;
+    for (int i = 1; i <= taskCount; i++) {
+      value += taskValue(jobId, stageId, i);
+    }
+    return value;
+  }
+
+  private long jobValue(int jobId, int stageCount, int tasksPerStage) {
+    long value = 0;
+    for (int i = 1; i <= stageCount; i++) {
+      value += stageValue(jobId, i, tasksPerStage);
+    }
+    return value;
+  }
+
+  private long globalValue(int jobCount, int stagesPerJob, int tasksPerStage) {
+    long value = 0;
+    for (int i = 1; i <= jobCount; i++) {
+      value += jobValue(i, stagesPerJob, tasksPerStage);
+    }
+    return value;
+  }
+
+  private void checkMetrics(Metrics metrics, long expected) {
+    assertEquals(expected, metrics.executorDeserializeTime);
+    assertEquals(expected, metrics.executorRunTime);
+    assertEquals(expected, metrics.resultSize);
+    assertEquals(expected, metrics.jvmGCTime);
+    assertEquals(expected, metrics.resultSerializationTime);
+    assertEquals(expected, metrics.memoryBytesSpilled);
+    assertEquals(expected, metrics.diskBytesSpilled);
+
+    InputMetrics im = metrics.inputMetrics.get();
+    assertEquals(DataReadMethod.Memory, im.readMethod);
+    assertEquals(expected, im.bytesRead);
+
+    ShuffleReadMetrics srm = metrics.shuffleReadMetrics.get();
+    assertEquals(expected, srm.remoteBlocksFetched);
+    assertEquals(expected, srm.localBlocksFetched);
+    assertEquals(expected, srm.fetchWaitTime);
+    assertEquals(expected, srm.remoteBytesRead);
+
+    ShuffleWriteMetrics swm = metrics.shuffleWriteMetrics.get();
+    assertEquals(expected, swm.shuffleBytesWritten);
+    assertEquals(expected, swm.shuffleWriteTime);
+  }
+
+}

Added: hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java?rev=1633917&view=auto
==============================================================================
--- hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java (added)
+++ hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java Thu Oct 23 18:47:52 2014
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.spark.client;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.InputStream;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.zip.ZipEntry;
+import java.util.jar.JarOutputStream;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Strings;
+import com.google.common.io.ByteStreams;
+import org.apache.spark.FutureAction;
+import org.apache.spark.SparkFiles;
+import org.apache.spark.api.java.JavaFutureAction;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.VoidFunction;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestSparkClient {
+
+  // Timeouts are bad... mmmkay.
+  private static final long TIMEOUT = 10;
+
+  private Map<String, String> createConf(boolean local) {
+    Map<String, String> conf = new HashMap<String, String>();
+    if (local) {
+      conf.put(ClientUtils.CONF_KEY_IN_PROCESS, "true");
+      conf.put("spark.master", "local");
+      conf.put("spark.app.name", "SparkClientSuite Local App");
+    } else {
+      String classpath = System.getProperty("java.class.path");
+      conf.put("spark.master", "local");
+      conf.put("spark.app.name", "SparkClientSuite Remote App");
+      conf.put("spark.driver.extraClassPath", classpath);
+      conf.put("spark.executor.extraClassPath", classpath);
+    }
+
+    if (!Strings.isNullOrEmpty(System.getProperty("spark.home"))) {
+      conf.put("spark.home", System.getProperty("spark.home"));
+    }
+
+    return conf;
+  }
+
+  @Test
+  public void testJobSubmission() throws Exception {
+    runTest(true, new TestFunction() {
+      @Override
+      public void call(SparkClient client) throws Exception {
+        JobHandle<String> handle = client.submit(new SimpleJob());
+        assertEquals("hello", handle.get(TIMEOUT, TimeUnit.SECONDS));
+      }
+    });
+  }
+
+  @Test
+  public void testSimpleSparkJob() throws Exception {
+    runTest(true, new TestFunction() {
+      @Override
+      public void call(SparkClient client) throws Exception {
+        JobHandle<Long> handle = client.submit(new SparkJob());
+        assertEquals(Long.valueOf(5L), handle.get(TIMEOUT, TimeUnit.SECONDS));
+      }
+    });
+  }
+
+  @Test
+  public void testErrorJob() throws Exception {
+    runTest(true, new TestFunction() {
+      @Override
+      public void call(SparkClient client) throws Exception {
+      JobHandle<String> handle = client.submit(new SimpleJob());
+        try {
+          handle.get(TIMEOUT, TimeUnit.SECONDS);
+        } catch (ExecutionException ee) {
+          assertTrue(ee.getCause() instanceof IllegalStateException);
+        }
+      }
+    });
+  }
+
+  @Test
+  public void testRemoteClient() throws Exception {
+    runTest(false, new TestFunction() {
+      @Override
+      public void call(SparkClient client) throws Exception {
+        JobHandle<Long> handle = client.submit(new SparkJob());
+        assertEquals(Long.valueOf(5L), handle.get(TIMEOUT, TimeUnit.SECONDS));
+      }
+    });
+  }
+
+  @Test
+  public void testMetricsCollection() throws Exception {
+    runTest(true, new TestFunction() {
+      @Override
+      public void call(SparkClient client) throws Exception {
+        JobHandle<Integer> future = client.submit(new AsyncSparkJob());
+        future.get(TIMEOUT, TimeUnit.SECONDS);
+        MetricsCollection metrics = future.getMetrics();
+        assertEquals(1, metrics.getJobIds().size());
+        assertTrue(metrics.getAllMetrics().executorRunTime > 0L);
+
+        JobHandle<Integer> future2 = client.submit(new AsyncSparkJob());
+        future2.get(TIMEOUT, TimeUnit.SECONDS);
+        MetricsCollection metrics2 = future2.getMetrics();
+        assertEquals(1, metrics2.getJobIds().size());
+        assertFalse(Objects.equal(metrics.getJobIds(), metrics2.getJobIds()));
+        assertTrue(metrics2.getAllMetrics().executorRunTime > 0L);
+      }
+    });
+  }
+
+  @Test
+  public void testAddJarsAndFiles() throws Exception {
+    runTest(true, new TestFunction() {
+      @Override
+      public void call(SparkClient client) throws Exception {
+        File jar = null;
+        File file = null;
+
+        try {
+          // Test that adding a jar to the remote context makes it show up in the classpath.
+          jar = File.createTempFile("test", ".jar");
+
+          JarOutputStream jarFile = new JarOutputStream(new FileOutputStream(jar));
+          jarFile.putNextEntry(new ZipEntry("test.resource"));
+          jarFile.write("test resource".getBytes("UTF-8"));
+          jarFile.closeEntry();
+          jarFile.close();
+
+          client.addJar(new URL("file:" + jar.getAbsolutePath()))
+            .get(TIMEOUT, TimeUnit.SECONDS);
+
+          // Need to run a Spark job to make sure the jar is added to the class loader. Monitoring
+          // SparkContext#addJar() doesn't mean much, we can only be sure jars have been distributed
+          // when we run a task after the jar has been added.
+          String result = client.submit(new JarJob()).get(TIMEOUT, TimeUnit.SECONDS);
+          assertEquals("test resource", result);
+
+          // Test that adding a file to the remote context makes it available to executors.
+          file = File.createTempFile("test", ".file");
+
+          FileOutputStream fileStream = new FileOutputStream(file);
+          fileStream.write("test file".getBytes("UTF-8"));
+          fileStream.close();
+
+          client.addJar(new URL("file:" + file.getAbsolutePath()))
+            .get(TIMEOUT, TimeUnit.SECONDS);
+
+          // The same applies to files added with "addFile". They're only guaranteed to be available
+          // to tasks started after the addFile() call completes.
+          result = client.submit(new FileJob(file.getName()))
+            .get(TIMEOUT, TimeUnit.SECONDS);
+          assertEquals("test file", result);
+        } finally {
+          if (jar != null) {
+            jar.delete();
+          }
+          if (file != null) {
+            file.delete();
+          }
+        }
+      }
+    });
+  }
+
+  @Test
+  public void testKryoSerializer() throws Exception {
+    runTest(true, new TestFunction() {
+      @Override
+      public void call(SparkClient client) throws Exception {
+        JobHandle<Long> handle = client.submit(new SparkJob());
+        assertEquals(Long.valueOf(5L), handle.get(TIMEOUT, TimeUnit.SECONDS));
+      }
+
+      @Override void config(Map<String, String> conf) {
+        conf.put(ClientUtils.CONF_KEY_SERIALIZER, "kryo");
+      }
+    });
+  }
+
+
+  private void runTest(boolean local, TestFunction test) throws Exception {
+    Map<String, String> conf = createConf(local);
+    SparkClientFactory.initialize(conf);
+    SparkClient client = null;
+    try {
+      test.config(conf);
+      client = SparkClientFactory.createClient(conf);
+      test.call(client);
+    } finally {
+      if (client != null) {
+        client.stop();
+      }
+      SparkClientFactory.stop();
+    }
+  }
+
+  private static class SimpleJob implements Job<String> {
+
+    @Override
+    public String call(JobContext jc) {
+      return "hello";
+    }
+
+  }
+
+  private static class SparkJob implements Job<Long> {
+
+    @Override
+    public Long call(JobContext jc) {
+      JavaRDD<Integer> rdd = jc.sc().parallelize(Arrays.asList(1, 2, 3, 4, 5));
+      return rdd.count();
+    }
+
+  }
+
+  private static class AsyncSparkJob implements Job<Integer> {
+
+    @Override
+    public Integer call(JobContext jc) throws Exception {
+      JavaRDD<Integer> rdd = jc.sc().parallelize(Arrays.asList(1, 2, 3, 4, 5));
+      JavaFutureAction<?> future = jc.monitor(rdd.foreachAsync(new VoidFunction<Integer>() {
+        @Override
+        public void call(Integer l) throws Exception {
+
+        }
+      }));
+
+      future.get(TIMEOUT, TimeUnit.SECONDS);
+
+      return 1;
+    }
+
+  }
+
+  private static class ErrorJob implements Job<String> {
+
+    @Override
+    public String call(JobContext jc) {
+      throw new IllegalStateException("This job does not work.");
+    }
+
+  }
+
+  private static class JarJob implements Job<String>, Function<Integer, String> {
+
+    @Override
+    public String call(JobContext jc) {
+      return jc.sc().parallelize(Arrays.asList(1)).map(this).collect().get(0);
+    }
+
+    @Override
+    public String call(Integer i) throws Exception {
+      ClassLoader ccl = Thread.currentThread().getContextClassLoader();
+      InputStream in = ccl.getResourceAsStream("test.resource");
+      byte[] bytes = ByteStreams.toByteArray(in);
+      in.close();
+      return new String(bytes, 0, bytes.length, "UTF-8");
+    }
+
+  }
+
+  private static class FileJob implements Job<String>, Function<Integer, String> {
+
+    private final String fileName;
+
+    FileJob(String fileName) {
+      this.fileName = fileName;
+    }
+
+    @Override
+    public String call(JobContext jc) {
+      return jc.sc().parallelize(Arrays.asList(1)).map(this).collect().get(0);
+    }
+
+    @Override
+    public String call(Integer i) throws Exception {
+      InputStream in = new FileInputStream(SparkFiles.get(fileName));
+      byte[] bytes = ByteStreams.toByteArray(in);
+      in.close();
+      return new String(bytes, 0, bytes.length, "UTF-8");
+    }
+
+  }
+
+  private static abstract class TestFunction {
+    abstract void call(SparkClient client) throws Exception;
+    void config(Map<String, String> conf) { }
+  }
+
+}

Added: hive/branches/spark/spark-client/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/test/resources/log4j.properties?rev=1633917&view=auto
==============================================================================
--- hive/branches/spark/spark-client/src/test/resources/log4j.properties (added)
+++ hive/branches/spark/spark-client/src/test/resources/log4j.properties Thu Oct 23 18:47:52 2014
@@ -0,0 +1,23 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file target/unit-tests.log
+log4j.rootCategory=DEBUG, console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n