You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2014/10/23 20:47:53 UTC
svn commit: r1633917 [2/2] - in /hive/branches/spark: ./ spark-client/
spark-client/src/ spark-client/src/main/ spark-client/src/main/java/
spark-client/src/main/java/org/ spark-client/src/main/java/org/apache/
spark-client/src/main/java/org/apache/hiv...
Added: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleReadMetrics.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleReadMetrics.java?rev=1633917&view=auto
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleReadMetrics.java (added)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleReadMetrics.java Thu Oct 23 18:47:52 2014
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.spark.client.metrics;
+
+import java.io.Serializable;
+
+import org.apache.spark.executor.TaskMetrics;
+
+/**
+ * Metrics pertaining to reading shuffle data.
+ */
+public class ShuffleReadMetrics implements Serializable {
+
+ /** Number of remote blocks fetched in shuffles by tasks. */
+ public final int remoteBlocksFetched;
+ /** Number of local blocks fetched in shuffles by tasks. */
+ public final int localBlocksFetched;
+ /**
+ * Time tasks spent waiting for remote shuffle blocks. This only includes the
+ * time blocking on shuffle input data. For instance if block B is being
+ * fetched while the task is still not finished processing block A, it is not
+ * considered to be blocking on block B.
+ */
+ public final long fetchWaitTime;
+ /** Total number of remote bytes read from the shuffle by tasks. */
+ public final long remoteBytesRead;
+
+ public ShuffleReadMetrics(
+ int remoteBlocksFetched,
+ int localBlocksFetched,
+ long fetchWaitTime,
+ long remoteBytesRead) {
+ this.remoteBlocksFetched = remoteBlocksFetched;
+ this.localBlocksFetched = localBlocksFetched;
+ this.fetchWaitTime = fetchWaitTime;
+ this.remoteBytesRead = remoteBytesRead;
+ }
+
+ public ShuffleReadMetrics(TaskMetrics metrics) {
+ this(metrics.shuffleReadMetrics().get().remoteBlocksFetched(),
+ metrics.shuffleReadMetrics().get().localBlocksFetched(),
+ metrics.shuffleReadMetrics().get().fetchWaitTime(),
+ metrics.shuffleReadMetrics().get().remoteBytesRead());
+ }
+
+ /**
+ * Number of blocks fetched in shuffle by tasks (remote or local).
+ */
+ public int getTotalBlocksFetched() {
+ return remoteBlocksFetched + localBlocksFetched;
+ }
+
+}
Added: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleWriteMetrics.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleWriteMetrics.java?rev=1633917&view=auto
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleWriteMetrics.java (added)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleWriteMetrics.java Thu Oct 23 18:47:52 2014
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.spark.client.metrics;
+
+import java.io.Serializable;
+
+import org.apache.spark.executor.TaskMetrics;
+
+/**
+ * Metrics pertaining to writing shuffle data.
+ */
+public class ShuffleWriteMetrics implements Serializable {
+
+ /** Number of bytes written for the shuffle by tasks. */
+ public final long shuffleBytesWritten;
+ /** Time tasks spent blocking on writes to disk or buffer cache, in nanoseconds. */
+ public final long shuffleWriteTime;
+
+ public ShuffleWriteMetrics(
+ long shuffleBytesWritten,
+ long shuffleWriteTime) {
+ this.shuffleBytesWritten = shuffleBytesWritten;
+ this.shuffleWriteTime = shuffleWriteTime;
+ }
+
+ public ShuffleWriteMetrics(TaskMetrics metrics) {
+ this(metrics.shuffleWriteMetrics().get().shuffleBytesWritten(),
+ metrics.shuffleWriteMetrics().get().shuffleWriteTime());
+ }
+
+}
Added: hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java?rev=1633917&view=auto
==============================================================================
--- hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java (added)
+++ hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java Thu Oct 23 18:47:52 2014
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.spark.client;
+
+import java.util.Arrays;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+import org.apache.hive.spark.client.metrics.*;
+
+public class TestMetricsCollection {
+
+ @Test
+ public void testMetricsAggregation() {
+ MetricsCollection collection = new MetricsCollection();
+ // 2 jobs, 2 stages per job, 2 tasks per stage.
+ for (int i : Arrays.asList(1, 2)) {
+ for (int j : Arrays.asList(1, 2)) {
+ for (long k : Arrays.asList(1L, 2L)) {
+ collection.addMetrics(i, j, k, makeMetrics(i, j, k));
+ }
+ }
+ }
+
+ assertEquals(ImmutableSet.of(1, 2), collection.getJobIds());
+ assertEquals(ImmutableSet.of(1, 2), collection.getStageIds(1));
+ assertEquals(ImmutableSet.of(1L, 2L), collection.getTaskIds(1, 1));
+
+ Metrics task112 = collection.getTaskMetrics(1, 1, 2);
+ checkMetrics(task112, taskValue(1, 1, 2));
+
+ Metrics stage21 = collection.getStageMetrics(2, 1);
+ checkMetrics(stage21, stageValue(2, 1, 2));
+
+ Metrics job1 = collection.getJobMetrics(1);
+ checkMetrics(job1, jobValue(1, 2, 2));
+
+ Metrics global = collection.getAllMetrics();
+ checkMetrics(global, globalValue(2, 2, 2));
+ }
+
+ @Test
+ public void testOptionalMetrics() {
+ long value = taskValue(1, 1, 1L);
+ Metrics metrics = new Metrics(value, value, value, value, value, value, value,
+ Optional.<InputMetrics>absent(),
+ Optional.<ShuffleReadMetrics>absent(),
+ Optional.<ShuffleWriteMetrics>absent());
+
+ MetricsCollection collection = new MetricsCollection();
+ for (int i : Arrays.asList(1, 2)) {
+ collection.addMetrics(i, 1, 1, metrics);
+ }
+
+ Metrics global = collection.getAllMetrics();
+ assertFalse(global.inputMetrics.isPresent());
+ assertFalse(global.shuffleReadMetrics.isPresent());
+ assertFalse(global.shuffleWriteMetrics.isPresent());
+
+ collection.addMetrics(3, 1, 1, makeMetrics(3, 1, 1));
+
+ Metrics global2 = collection.getAllMetrics();
+ assertTrue(global2.inputMetrics.isPresent());
+ assertEquals(taskValue(3, 1, 1), global2.inputMetrics.get().bytesRead);
+
+ assertTrue(global2.shuffleReadMetrics.isPresent());
+ assertTrue(global2.shuffleWriteMetrics.isPresent());
+ }
+
+ @Test
+ public void testInputReadMethodAggregation() {
+ MetricsCollection collection = new MetricsCollection();
+
+ long value = taskValue(1, 1, 1);
+ Metrics metrics1 = new Metrics(value, value, value, value, value, value, value,
+ Optional.fromNullable(new InputMetrics(DataReadMethod.Memory, value)),
+ Optional.<ShuffleReadMetrics>absent(),
+ Optional.<ShuffleWriteMetrics>absent());
+ Metrics metrics2 = new Metrics(value, value, value, value, value, value, value,
+ Optional.fromNullable(new InputMetrics(DataReadMethod.Disk, value)),
+ Optional.<ShuffleReadMetrics>absent(),
+ Optional.<ShuffleWriteMetrics>absent());
+
+ collection.addMetrics(1, 1, 1, metrics1);
+ collection.addMetrics(1, 1, 2, metrics2);
+
+ Metrics global = collection.getAllMetrics();
+ assertTrue(global.inputMetrics.isPresent());
+ assertEquals(DataReadMethod.Multiple, global.inputMetrics.get().readMethod);
+ }
+
+ private Metrics makeMetrics(int jobId, int stageId, long taskId) {
+ long value = 1000000 * jobId + 1000 * stageId + taskId;
+ return new Metrics(value, value, value, value, value, value, value,
+ Optional.fromNullable(new InputMetrics(DataReadMethod.Memory, value)),
+ Optional.fromNullable(new ShuffleReadMetrics((int) value, (int) value, value, value)),
+ Optional.fromNullable(new ShuffleWriteMetrics(value, value)));
+ }
+
+ /**
+ * The metric values will all be the same. This makes it easy to calculate the aggregated values
+ * of jobs and stages without fancy math.
+ */
+ private long taskValue(int jobId, int stageId, long taskId) {
+ return 1000000 * jobId + 1000 * stageId + taskId;
+ }
+
+ private long stageValue(int jobId, int stageId, int taskCount) {
+ long value = 0;
+ for (int i = 1; i <= taskCount; i++) {
+ value += taskValue(jobId, stageId, i);
+ }
+ return value;
+ }
+
+ private long jobValue(int jobId, int stageCount, int tasksPerStage) {
+ long value = 0;
+ for (int i = 1; i <= stageCount; i++) {
+ value += stageValue(jobId, i, tasksPerStage);
+ }
+ return value;
+ }
+
+ private long globalValue(int jobCount, int stagesPerJob, int tasksPerStage) {
+ long value = 0;
+ for (int i = 1; i <= jobCount; i++) {
+ value += jobValue(i, stagesPerJob, tasksPerStage);
+ }
+ return value;
+ }
+
+ private void checkMetrics(Metrics metrics, long expected) {
+ assertEquals(expected, metrics.executorDeserializeTime);
+ assertEquals(expected, metrics.executorRunTime);
+ assertEquals(expected, metrics.resultSize);
+ assertEquals(expected, metrics.jvmGCTime);
+ assertEquals(expected, metrics.resultSerializationTime);
+ assertEquals(expected, metrics.memoryBytesSpilled);
+ assertEquals(expected, metrics.diskBytesSpilled);
+
+ InputMetrics im = metrics.inputMetrics.get();
+ assertEquals(DataReadMethod.Memory, im.readMethod);
+ assertEquals(expected, im.bytesRead);
+
+ ShuffleReadMetrics srm = metrics.shuffleReadMetrics.get();
+ assertEquals(expected, srm.remoteBlocksFetched);
+ assertEquals(expected, srm.localBlocksFetched);
+ assertEquals(expected, srm.fetchWaitTime);
+ assertEquals(expected, srm.remoteBytesRead);
+
+ ShuffleWriteMetrics swm = metrics.shuffleWriteMetrics.get();
+ assertEquals(expected, swm.shuffleBytesWritten);
+ assertEquals(expected, swm.shuffleWriteTime);
+ }
+
+}
Added: hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java?rev=1633917&view=auto
==============================================================================
--- hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java (added)
+++ hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java Thu Oct 23 18:47:52 2014
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.spark.client;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.InputStream;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.zip.ZipEntry;
+import java.util.jar.JarOutputStream;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Strings;
+import com.google.common.io.ByteStreams;
+import org.apache.spark.FutureAction;
+import org.apache.spark.SparkFiles;
+import org.apache.spark.api.java.JavaFutureAction;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.VoidFunction;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestSparkClient {
+
+ // Timeouts are bad... mmmkay.
+ private static final long TIMEOUT = 10;
+
+ private Map<String, String> createConf(boolean local) {
+ Map<String, String> conf = new HashMap<String, String>();
+ if (local) {
+ conf.put(ClientUtils.CONF_KEY_IN_PROCESS, "true");
+ conf.put("spark.master", "local");
+ conf.put("spark.app.name", "SparkClientSuite Local App");
+ } else {
+ String classpath = System.getProperty("java.class.path");
+ conf.put("spark.master", "local");
+ conf.put("spark.app.name", "SparkClientSuite Remote App");
+ conf.put("spark.driver.extraClassPath", classpath);
+ conf.put("spark.executor.extraClassPath", classpath);
+ }
+
+ if (!Strings.isNullOrEmpty(System.getProperty("spark.home"))) {
+ conf.put("spark.home", System.getProperty("spark.home"));
+ }
+
+ return conf;
+ }
+
+ @Test
+ public void testJobSubmission() throws Exception {
+ runTest(true, new TestFunction() {
+ @Override
+ public void call(SparkClient client) throws Exception {
+ JobHandle<String> handle = client.submit(new SimpleJob());
+ assertEquals("hello", handle.get(TIMEOUT, TimeUnit.SECONDS));
+ }
+ });
+ }
+
+ @Test
+ public void testSimpleSparkJob() throws Exception {
+ runTest(true, new TestFunction() {
+ @Override
+ public void call(SparkClient client) throws Exception {
+ JobHandle<Long> handle = client.submit(new SparkJob());
+ assertEquals(Long.valueOf(5L), handle.get(TIMEOUT, TimeUnit.SECONDS));
+ }
+ });
+ }
+
+ @Test
+ public void testErrorJob() throws Exception {
+ runTest(true, new TestFunction() {
+ @Override
+ public void call(SparkClient client) throws Exception {
+ JobHandle<String> handle = client.submit(new SimpleJob());
+ try {
+ handle.get(TIMEOUT, TimeUnit.SECONDS);
+ } catch (ExecutionException ee) {
+ assertTrue(ee.getCause() instanceof IllegalStateException);
+ }
+ }
+ });
+ }
+
+ @Test
+ public void testRemoteClient() throws Exception {
+ runTest(false, new TestFunction() {
+ @Override
+ public void call(SparkClient client) throws Exception {
+ JobHandle<Long> handle = client.submit(new SparkJob());
+ assertEquals(Long.valueOf(5L), handle.get(TIMEOUT, TimeUnit.SECONDS));
+ }
+ });
+ }
+
+ @Test
+ public void testMetricsCollection() throws Exception {
+ runTest(true, new TestFunction() {
+ @Override
+ public void call(SparkClient client) throws Exception {
+ JobHandle<Integer> future = client.submit(new AsyncSparkJob());
+ future.get(TIMEOUT, TimeUnit.SECONDS);
+ MetricsCollection metrics = future.getMetrics();
+ assertEquals(1, metrics.getJobIds().size());
+ assertTrue(metrics.getAllMetrics().executorRunTime > 0L);
+
+ JobHandle<Integer> future2 = client.submit(new AsyncSparkJob());
+ future2.get(TIMEOUT, TimeUnit.SECONDS);
+ MetricsCollection metrics2 = future2.getMetrics();
+ assertEquals(1, metrics2.getJobIds().size());
+ assertFalse(Objects.equal(metrics.getJobIds(), metrics2.getJobIds()));
+ assertTrue(metrics2.getAllMetrics().executorRunTime > 0L);
+ }
+ });
+ }
+
+ @Test
+ public void testAddJarsAndFiles() throws Exception {
+ runTest(true, new TestFunction() {
+ @Override
+ public void call(SparkClient client) throws Exception {
+ File jar = null;
+ File file = null;
+
+ try {
+ // Test that adding a jar to the remote context makes it show up in the classpath.
+ jar = File.createTempFile("test", ".jar");
+
+ JarOutputStream jarFile = new JarOutputStream(new FileOutputStream(jar));
+ jarFile.putNextEntry(new ZipEntry("test.resource"));
+ jarFile.write("test resource".getBytes("UTF-8"));
+ jarFile.closeEntry();
+ jarFile.close();
+
+ client.addJar(new URL("file:" + jar.getAbsolutePath()))
+ .get(TIMEOUT, TimeUnit.SECONDS);
+
+ // Need to run a Spark job to make sure the jar is added to the class loader. Monitoring
+ // SparkContext#addJar() doesn't mean much, we can only be sure jars have been distributed
+ // when we run a task after the jar has been added.
+ String result = client.submit(new JarJob()).get(TIMEOUT, TimeUnit.SECONDS);
+ assertEquals("test resource", result);
+
+ // Test that adding a file to the remote context makes it available to executors.
+ file = File.createTempFile("test", ".file");
+
+ FileOutputStream fileStream = new FileOutputStream(file);
+ fileStream.write("test file".getBytes("UTF-8"));
+ fileStream.close();
+
+ client.addJar(new URL("file:" + file.getAbsolutePath()))
+ .get(TIMEOUT, TimeUnit.SECONDS);
+
+ // The same applies to files added with "addFile". They're only guaranteed to be available
+ // to tasks started after the addFile() call completes.
+ result = client.submit(new FileJob(file.getName()))
+ .get(TIMEOUT, TimeUnit.SECONDS);
+ assertEquals("test file", result);
+ } finally {
+ if (jar != null) {
+ jar.delete();
+ }
+ if (file != null) {
+ file.delete();
+ }
+ }
+ }
+ });
+ }
+
+ @Test
+ public void testKryoSerializer() throws Exception {
+ runTest(true, new TestFunction() {
+ @Override
+ public void call(SparkClient client) throws Exception {
+ JobHandle<Long> handle = client.submit(new SparkJob());
+ assertEquals(Long.valueOf(5L), handle.get(TIMEOUT, TimeUnit.SECONDS));
+ }
+
+ @Override void config(Map<String, String> conf) {
+ conf.put(ClientUtils.CONF_KEY_SERIALIZER, "kryo");
+ }
+ });
+ }
+
+
+ private void runTest(boolean local, TestFunction test) throws Exception {
+ Map<String, String> conf = createConf(local);
+ SparkClientFactory.initialize(conf);
+ SparkClient client = null;
+ try {
+ test.config(conf);
+ client = SparkClientFactory.createClient(conf);
+ test.call(client);
+ } finally {
+ if (client != null) {
+ client.stop();
+ }
+ SparkClientFactory.stop();
+ }
+ }
+
+ private static class SimpleJob implements Job<String> {
+
+ @Override
+ public String call(JobContext jc) {
+ return "hello";
+ }
+
+ }
+
+ private static class SparkJob implements Job<Long> {
+
+ @Override
+ public Long call(JobContext jc) {
+ JavaRDD<Integer> rdd = jc.sc().parallelize(Arrays.asList(1, 2, 3, 4, 5));
+ return rdd.count();
+ }
+
+ }
+
+ private static class AsyncSparkJob implements Job<Integer> {
+
+ @Override
+ public Integer call(JobContext jc) throws Exception {
+ JavaRDD<Integer> rdd = jc.sc().parallelize(Arrays.asList(1, 2, 3, 4, 5));
+ JavaFutureAction<?> future = jc.monitor(rdd.foreachAsync(new VoidFunction<Integer>() {
+ @Override
+ public void call(Integer l) throws Exception {
+
+ }
+ }));
+
+ future.get(TIMEOUT, TimeUnit.SECONDS);
+
+ return 1;
+ }
+
+ }
+
+ private static class ErrorJob implements Job<String> {
+
+ @Override
+ public String call(JobContext jc) {
+ throw new IllegalStateException("This job does not work.");
+ }
+
+ }
+
+ private static class JarJob implements Job<String>, Function<Integer, String> {
+
+ @Override
+ public String call(JobContext jc) {
+ return jc.sc().parallelize(Arrays.asList(1)).map(this).collect().get(0);
+ }
+
+ @Override
+ public String call(Integer i) throws Exception {
+ ClassLoader ccl = Thread.currentThread().getContextClassLoader();
+ InputStream in = ccl.getResourceAsStream("test.resource");
+ byte[] bytes = ByteStreams.toByteArray(in);
+ in.close();
+ return new String(bytes, 0, bytes.length, "UTF-8");
+ }
+
+ }
+
+ private static class FileJob implements Job<String>, Function<Integer, String> {
+
+ private final String fileName;
+
+ FileJob(String fileName) {
+ this.fileName = fileName;
+ }
+
+ @Override
+ public String call(JobContext jc) {
+ return jc.sc().parallelize(Arrays.asList(1)).map(this).collect().get(0);
+ }
+
+ @Override
+ public String call(Integer i) throws Exception {
+ InputStream in = new FileInputStream(SparkFiles.get(fileName));
+ byte[] bytes = ByteStreams.toByteArray(in);
+ in.close();
+ return new String(bytes, 0, bytes.length, "UTF-8");
+ }
+
+ }
+
+ private static abstract class TestFunction {
+ abstract void call(SparkClient client) throws Exception;
+ void config(Map<String, String> conf) { }
+ }
+
+}
Added: hive/branches/spark/spark-client/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/test/resources/log4j.properties?rev=1633917&view=auto
==============================================================================
--- hive/branches/spark/spark-client/src/test/resources/log4j.properties (added)
+++ hive/branches/spark/spark-client/src/test/resources/log4j.properties Thu Oct 23 18:47:52 2014
@@ -0,0 +1,23 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file target/unit-tests.log
+log4j.rootCategory=DEBUG, console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n