You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by su...@apache.org on 2012/10/22 22:43:30 UTC
svn commit: r1401071 [3/7] - in
/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project: ./ conf/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/...
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestClientProtocolProviderImpls.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestClientProtocolProviderImpls.java?rev=1401071&r1=1401070&r2=1401071&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestClientProtocolProviderImpls.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestClientProtocolProviderImpls.java Mon Oct 22 20:43:16 2012
@@ -1,120 +1,120 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.mapreduce;
-
-import java.io.IOException;
-
-import junit.framework.TestCase;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.LocalJobRunner;
-import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
-import org.junit.Test;
-
-public class TestClientProtocolProviderImpls extends TestCase {
-
- @Test
- public void testClusterWithLocalClientProvider() throws Exception {
-
- Configuration conf = new Configuration();
-
- try {
- conf.set(MRConfig.FRAMEWORK_NAME, "incorrect");
- new Cluster(conf);
- fail("Cluster should not be initialized with incorrect framework name");
- } catch (IOException e) {
-
- }
-
- try {
- conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME);
- conf.set(JTConfig.JT_IPC_ADDRESS, "127.0.0.1:0");
-
- new Cluster(conf);
- fail("Cluster with Local Framework name should use local JT address");
- } catch (IOException e) {
-
- }
-
- try {
- conf.set(JTConfig.JT_IPC_ADDRESS, "local");
- Cluster cluster = new Cluster(conf);
- assertTrue(cluster.getClient() instanceof LocalJobRunner);
- cluster.close();
- } catch (IOException e) {
-
- }
- }
-
- @Test
- public void testClusterWithJTClientProvider() throws Exception {
-
- Configuration conf = new Configuration();
- try {
- conf.set(MRConfig.FRAMEWORK_NAME, "incorrect");
- new Cluster(conf);
- fail("Cluster should not be initialized with incorrect framework name");
-
- } catch (IOException e) {
-
- }
-
- try {
- conf.set(MRConfig.FRAMEWORK_NAME, "classic");
- conf.set(JTConfig.JT_IPC_ADDRESS, "local");
- new Cluster(conf);
- fail("Cluster with classic Framework name shouldnot use local JT address");
-
- } catch (IOException e) {
-
- }
-
- try {
- conf = new Configuration();
- conf.set(MRConfig.FRAMEWORK_NAME, "classic");
- conf.set(JTConfig.JT_IPC_ADDRESS, "127.0.0.1:0");
- Cluster cluster = new Cluster(conf);
- cluster.close();
- } catch (IOException e) {
-
- }
- }
-
- @Test
- public void testClusterException() {
-
- Configuration conf = new Configuration();
- conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.CLASSIC_FRAMEWORK_NAME);
- conf.set(JTConfig.JT_IPC_ADDRESS, "local");
-
- // initializing a cluster with this conf should throw an error.
- // However the exception thrown should not be specific to either
- // the job tracker client provider or the local provider
- boolean errorThrown = false;
- try {
- Cluster cluster = new Cluster(conf);
- cluster.close();
- fail("Not expected - cluster init should have failed");
- } catch (IOException e) {
- errorThrown = true;
- assert(e.getMessage().contains("Cannot initialize Cluster. Please check"));
- }
- assert(errorThrown);
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.LocalJobRunner;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.junit.Test;
+
+public class TestClientProtocolProviderImpls extends TestCase {
+
+ @Test
+ public void testClusterWithLocalClientProvider() throws Exception {
+
+ Configuration conf = new Configuration();
+
+ try {
+ conf.set(MRConfig.FRAMEWORK_NAME, "incorrect");
+ new Cluster(conf);
+ fail("Cluster should not be initialized with incorrect framework name");
+ } catch (IOException e) {
+
+ }
+
+ try {
+ conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME);
+ conf.set(JTConfig.JT_IPC_ADDRESS, "127.0.0.1:0");
+
+ new Cluster(conf);
+ fail("Cluster with Local Framework name should use local JT address");
+ } catch (IOException e) {
+
+ }
+
+ try {
+ conf.set(JTConfig.JT_IPC_ADDRESS, "local");
+ Cluster cluster = new Cluster(conf);
+ assertTrue(cluster.getClient() instanceof LocalJobRunner);
+ cluster.close();
+ } catch (IOException e) {
+
+ }
+ }
+
+ @Test
+ public void testClusterWithJTClientProvider() throws Exception {
+
+ Configuration conf = new Configuration();
+ try {
+ conf.set(MRConfig.FRAMEWORK_NAME, "incorrect");
+ new Cluster(conf);
+ fail("Cluster should not be initialized with incorrect framework name");
+
+ } catch (IOException e) {
+
+ }
+
+ try {
+ conf.set(MRConfig.FRAMEWORK_NAME, "classic");
+ conf.set(JTConfig.JT_IPC_ADDRESS, "local");
+ new Cluster(conf);
+ fail("Cluster with classic Framework name shouldnot use local JT address");
+
+ } catch (IOException e) {
+
+ }
+
+ try {
+ conf = new Configuration();
+ conf.set(MRConfig.FRAMEWORK_NAME, "classic");
+ conf.set(JTConfig.JT_IPC_ADDRESS, "127.0.0.1:0");
+ Cluster cluster = new Cluster(conf);
+ cluster.close();
+ } catch (IOException e) {
+
+ }
+ }
+
+ @Test
+ public void testClusterException() {
+
+ Configuration conf = new Configuration();
+ conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.CLASSIC_FRAMEWORK_NAME);
+ conf.set(JTConfig.JT_IPC_ADDRESS, "local");
+
+ // initializing a cluster with this conf should throw an error.
+ // However the exception thrown should not be specific to either
+ // the job tracker client provider or the local provider
+ boolean errorThrown = false;
+ try {
+ Cluster cluster = new Cluster(conf);
+ cluster.close();
+ fail("Not expected - cluster init should have failed");
+ } catch (IOException e) {
+ errorThrown = true;
+ assert(e.getMessage().contains("Cannot initialize Cluster. Please check"));
+ }
+ assert(errorThrown);
+ }
+}
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestYarnClientProtocolProvider.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestYarnClientProtocolProvider.java?rev=1401071&r1=1401070&r2=1401071&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestYarnClientProtocolProvider.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestYarnClientProtocolProvider.java Mon Oct 22 20:43:16 2012
@@ -1,129 +1,129 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.mapreduce;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import junit.framework.TestCase;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.LocalJobRunner;
-import org.apache.hadoop.mapred.ResourceMgrDelegate;
-import org.apache.hadoop.mapred.YARNRunner;
-import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.yarn.api.ClientRMProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
-import org.apache.hadoop.yarn.api.records.DelegationToken;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.junit.Test;
-
-public class TestYarnClientProtocolProvider extends TestCase {
-
- private static final RecordFactory recordFactory = RecordFactoryProvider.
- getRecordFactory(null);
-
- @Test
- public void testClusterWithYarnClientProtocolProvider() throws Exception {
-
- Configuration conf = new Configuration(false);
- Cluster cluster = null;
-
- try {
- cluster = new Cluster(conf);
- } catch (Exception e) {
- throw new Exception(
- "Failed to initialize a local runner w/o a cluster framework key", e);
- }
-
- try {
- assertTrue("client is not a LocalJobRunner",
- cluster.getClient() instanceof LocalJobRunner);
- } finally {
- if (cluster != null) {
- cluster.close();
- }
- }
-
- try {
- conf = new Configuration();
- conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
- cluster = new Cluster(conf);
- ClientProtocol client = cluster.getClient();
- assertTrue("client is a YARNRunner", client instanceof YARNRunner);
- } catch (IOException e) {
-
- } finally {
- if (cluster != null) {
- cluster.close();
- }
- }
- }
-
-
- @Test
- public void testClusterGetDelegationToken() throws Exception {
-
- Configuration conf = new Configuration(false);
- Cluster cluster = null;
- try {
- conf = new Configuration();
- conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
- cluster = new Cluster(conf);
- YARNRunner yrunner = (YARNRunner) cluster.getClient();
- GetDelegationTokenResponse getDTResponse =
- recordFactory.newRecordInstance(GetDelegationTokenResponse.class);
- DelegationToken rmDTToken = recordFactory.newRecordInstance(
- DelegationToken.class);
- rmDTToken.setIdentifier(ByteBuffer.wrap(new byte[2]));
- rmDTToken.setKind("Testclusterkind");
- rmDTToken.setPassword(ByteBuffer.wrap("testcluster".getBytes()));
- rmDTToken.setService("0.0.0.0:8032");
- getDTResponse.setRMDelegationToken(rmDTToken);
- final ClientRMProtocol cRMProtocol = mock(ClientRMProtocol.class);
- when(cRMProtocol.getDelegationToken(any(
- GetDelegationTokenRequest.class))).thenReturn(getDTResponse);
- ResourceMgrDelegate rmgrDelegate = new ResourceMgrDelegate(
- new YarnConfiguration(conf)) {
- @Override
- public synchronized void start() {
- this.rmClient = cRMProtocol;
- }
- };
- yrunner.setResourceMgrDelegate(rmgrDelegate);
- Token t = cluster.getDelegationToken(new Text(" "));
- assertTrue("Token kind is instead " + t.getKind().toString(),
- "Testclusterkind".equals(t.getKind().toString()));
- } finally {
- if (cluster != null) {
- cluster.close();
- }
- }
- }
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.LocalJobRunner;
+import org.apache.hadoop.mapred.ResourceMgrDelegate;
+import org.apache.hadoop.mapred.YARNRunner;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.ClientRMProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.records.DelegationToken;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.junit.Test;
+
+public class TestYarnClientProtocolProvider extends TestCase {
+
+ private static final RecordFactory recordFactory = RecordFactoryProvider.
+ getRecordFactory(null);
+
+ @Test
+ public void testClusterWithYarnClientProtocolProvider() throws Exception {
+
+ Configuration conf = new Configuration(false);
+ Cluster cluster = null;
+
+ try {
+ cluster = new Cluster(conf);
+ } catch (Exception e) {
+ throw new Exception(
+ "Failed to initialize a local runner w/o a cluster framework key", e);
+ }
+
+ try {
+ assertTrue("client is not a LocalJobRunner",
+ cluster.getClient() instanceof LocalJobRunner);
+ } finally {
+ if (cluster != null) {
+ cluster.close();
+ }
+ }
+
+ try {
+ conf = new Configuration();
+ conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
+ cluster = new Cluster(conf);
+ ClientProtocol client = cluster.getClient();
+ assertTrue("client is a YARNRunner", client instanceof YARNRunner);
+ } catch (IOException e) {
+
+ } finally {
+ if (cluster != null) {
+ cluster.close();
+ }
+ }
+ }
+
+
+ @Test
+ public void testClusterGetDelegationToken() throws Exception {
+
+ Configuration conf = new Configuration(false);
+ Cluster cluster = null;
+ try {
+ conf = new Configuration();
+ conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
+ cluster = new Cluster(conf);
+ YARNRunner yrunner = (YARNRunner) cluster.getClient();
+ GetDelegationTokenResponse getDTResponse =
+ recordFactory.newRecordInstance(GetDelegationTokenResponse.class);
+ DelegationToken rmDTToken = recordFactory.newRecordInstance(
+ DelegationToken.class);
+ rmDTToken.setIdentifier(ByteBuffer.wrap(new byte[2]));
+ rmDTToken.setKind("Testclusterkind");
+ rmDTToken.setPassword(ByteBuffer.wrap("testcluster".getBytes()));
+ rmDTToken.setService("0.0.0.0:8032");
+ getDTResponse.setRMDelegationToken(rmDTToken);
+ final ClientRMProtocol cRMProtocol = mock(ClientRMProtocol.class);
+ when(cRMProtocol.getDelegationToken(any(
+ GetDelegationTokenRequest.class))).thenReturn(getDTResponse);
+ ResourceMgrDelegate rmgrDelegate = new ResourceMgrDelegate(
+ new YarnConfiguration(conf)) {
+ @Override
+ public synchronized void start() {
+ this.rmClient = cRMProtocol;
+ }
+ };
+ yrunner.setResourceMgrDelegate(rmgrDelegate);
+ Token t = cluster.getDelegationToken(new Text(" "));
+ assertTrue("Token kind is instead " + t.getKind().toString(),
+ "Testclusterkind".equals(t.getKind().toString()));
+ } finally {
+ if (cluster != null) {
+ cluster.close();
+ }
+ }
+ }
+
+}
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java?rev=1401071&r1=1401070&r2=1401071&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java Mon Oct 22 20:43:16 2012
@@ -317,7 +317,7 @@ public class TestCombineFileInputFormat
for (InputSplit split : splits) {
System.out.println("File split(Test0): " + split);
}
- assertEquals(splits.size(), 1);
+ assertEquals(1, splits.size());
CombineFileSplit fileSplit = (CombineFileSplit) splits.get(0);
assertEquals(2, fileSplit.getNumPaths());
assertEquals(1, fileSplit.getLocations().length);
@@ -347,24 +347,24 @@ public class TestCombineFileInputFormat
for (InputSplit split : splits) {
System.out.println("File split(Test1): " + split);
}
- assertEquals(splits.size(), 2);
+ assertEquals(2, splits.size());
fileSplit = (CombineFileSplit) splits.get(0);
- assertEquals(fileSplit.getNumPaths(), 2);
- assertEquals(fileSplit.getLocations().length, 1);
- assertEquals(fileSplit.getPath(0).getName(), file2.getName());
- assertEquals(fileSplit.getOffset(0), 0);
- assertEquals(fileSplit.getLength(0), BLOCKSIZE);
- assertEquals(fileSplit.getPath(1).getName(), file2.getName());
- assertEquals(fileSplit.getOffset(1), BLOCKSIZE);
- assertEquals(fileSplit.getLength(1), BLOCKSIZE);
- assertEquals(fileSplit.getLocations()[0], hosts2[0]); // should be on r2
- fileSplit = (CombineFileSplit) splits.get(1);
- assertEquals(fileSplit.getNumPaths(), 1);
- assertEquals(fileSplit.getLocations().length, 1);
- assertEquals(fileSplit.getPath(0).getName(), file1.getName());
- assertEquals(fileSplit.getOffset(0), 0);
- assertEquals(fileSplit.getLength(0), BLOCKSIZE);
- assertEquals(fileSplit.getLocations()[0], hosts1[0]); // should be on r1
+ assertEquals(2, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file2.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(BLOCKSIZE, fileSplit.getLength(0));
+ assertEquals(file2.getName(), fileSplit.getPath(1).getName());
+ assertEquals(BLOCKSIZE, fileSplit.getOffset(1));
+ assertEquals(BLOCKSIZE, fileSplit.getLength(1));
+ assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
+ fileSplit = (CombineFileSplit) splits.get(1);
+ assertEquals(1, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(BLOCKSIZE, fileSplit.getLength(0));
+ assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
// create another file on 3 datanodes and 3 racks.
dfs.startDataNodes(conf, 1, true, null, rack3, hosts3, null);
@@ -378,37 +378,37 @@ public class TestCombineFileInputFormat
for (InputSplit split : splits) {
System.out.println("File split(Test2): " + split);
}
- assertEquals(splits.size(), 3);
+ assertEquals(3, splits.size());
fileSplit = (CombineFileSplit) splits.get(0);
- assertEquals(fileSplit.getNumPaths(), 3);
- assertEquals(fileSplit.getLocations().length, 1);
- assertEquals(fileSplit.getPath(0).getName(), file3.getName());
- assertEquals(fileSplit.getOffset(0), 0);
- assertEquals(fileSplit.getLength(0), BLOCKSIZE);
- assertEquals(fileSplit.getPath(1).getName(), file3.getName());
- assertEquals(fileSplit.getOffset(1), BLOCKSIZE);
- assertEquals(fileSplit.getLength(1), BLOCKSIZE);
- assertEquals(fileSplit.getPath(2).getName(), file3.getName());
- assertEquals(fileSplit.getOffset(2), 2 * BLOCKSIZE);
- assertEquals(fileSplit.getLength(2), BLOCKSIZE);
- assertEquals(fileSplit.getLocations()[0], hosts3[0]); // should be on r3
- fileSplit = (CombineFileSplit) splits.get(1);
- assertEquals(fileSplit.getNumPaths(), 2);
- assertEquals(fileSplit.getLocations().length, 1);
- assertEquals(fileSplit.getPath(0).getName(), file2.getName());
- assertEquals(fileSplit.getOffset(0), 0);
- assertEquals(fileSplit.getLength(0), BLOCKSIZE);
- assertEquals(fileSplit.getPath(1).getName(), file2.getName());
- assertEquals(fileSplit.getOffset(1), BLOCKSIZE);
- assertEquals(fileSplit.getLength(1), BLOCKSIZE);
- assertEquals(fileSplit.getLocations()[0], hosts2[0]); // should be on r2
- fileSplit = (CombineFileSplit) splits.get(2);
- assertEquals(fileSplit.getNumPaths(), 1);
- assertEquals(fileSplit.getLocations().length, 1);
- assertEquals(fileSplit.getPath(0).getName(), file1.getName());
- assertEquals(fileSplit.getOffset(0), 0);
- assertEquals(fileSplit.getLength(0), BLOCKSIZE);
- assertEquals(fileSplit.getLocations()[0], hosts1[0]); // should be on r1
+ assertEquals(3, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file3.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(BLOCKSIZE, fileSplit.getLength(0));
+ assertEquals(file3.getName(), fileSplit.getPath(1).getName());
+ assertEquals(BLOCKSIZE, fileSplit.getOffset(1));
+ assertEquals(BLOCKSIZE, fileSplit.getLength(1));
+ assertEquals(file3.getName(), fileSplit.getPath(2).getName());
+ assertEquals(2 * BLOCKSIZE, fileSplit.getOffset(2));
+ assertEquals(BLOCKSIZE, fileSplit.getLength(2));
+ assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
+ fileSplit = (CombineFileSplit) splits.get(1);
+ assertEquals(2, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file2.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(BLOCKSIZE, fileSplit.getLength(0));
+ assertEquals(file2.getName(), fileSplit.getPath(1).getName());
+ assertEquals(BLOCKSIZE, fileSplit.getOffset(1));
+ assertEquals(BLOCKSIZE, fileSplit.getLength(1));
+ assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
+ fileSplit = (CombineFileSplit) splits.get(2);
+ assertEquals(1, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(BLOCKSIZE, fileSplit.getLength(0));
+ assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
// create file4 on all three racks
Path file4 = new Path(dir4 + "/file4");
@@ -420,37 +420,37 @@ public class TestCombineFileInputFormat
for (InputSplit split : splits) {
System.out.println("File split(Test3): " + split);
}
- assertEquals(splits.size(), 3);
+ assertEquals(3, splits.size());
fileSplit = (CombineFileSplit) splits.get(0);
- assertEquals(fileSplit.getNumPaths(), 6);
- assertEquals(fileSplit.getLocations().length, 1);
- assertEquals(fileSplit.getPath(0).getName(), file3.getName());
- assertEquals(fileSplit.getOffset(0), 0);
- assertEquals(fileSplit.getLength(0), BLOCKSIZE);
- assertEquals(fileSplit.getPath(1).getName(), file3.getName());
- assertEquals(fileSplit.getOffset(1), BLOCKSIZE);
- assertEquals(fileSplit.getLength(1), BLOCKSIZE);
- assertEquals(fileSplit.getPath(2).getName(), file3.getName());
- assertEquals(fileSplit.getOffset(2), 2 * BLOCKSIZE);
- assertEquals(fileSplit.getLength(2), BLOCKSIZE);
- assertEquals(fileSplit.getLocations()[0], hosts3[0]); // should be on r3
- fileSplit = (CombineFileSplit) splits.get(1);
- assertEquals(fileSplit.getNumPaths(), 2);
- assertEquals(fileSplit.getLocations().length, 1);
- assertEquals(fileSplit.getPath(0).getName(), file2.getName());
- assertEquals(fileSplit.getOffset(0), 0);
- assertEquals(fileSplit.getLength(0), BLOCKSIZE);
- assertEquals(fileSplit.getPath(1).getName(), file2.getName());
- assertEquals(fileSplit.getOffset(1), BLOCKSIZE);
- assertEquals(fileSplit.getLength(1), BLOCKSIZE);
- assertEquals(fileSplit.getLocations()[0], hosts2[0]); // should be on r2
- fileSplit = (CombineFileSplit) splits.get(2);
- assertEquals(fileSplit.getNumPaths(), 1);
- assertEquals(fileSplit.getLocations().length, 1);
- assertEquals(fileSplit.getPath(0).getName(), file1.getName());
- assertEquals(fileSplit.getOffset(0), 0);
- assertEquals(fileSplit.getLength(0), BLOCKSIZE);
- assertEquals(fileSplit.getLocations()[0], hosts1[0]); // should be on r1
+ assertEquals(6, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file3.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(BLOCKSIZE, fileSplit.getLength(0));
+ assertEquals(file3.getName(), fileSplit.getPath(1).getName());
+ assertEquals(BLOCKSIZE, fileSplit.getOffset(1));
+ assertEquals(BLOCKSIZE, fileSplit.getLength(1));
+ assertEquals(file3.getName(), fileSplit.getPath(2).getName());
+ assertEquals(2 * BLOCKSIZE, fileSplit.getOffset(2));
+ assertEquals(BLOCKSIZE, fileSplit.getLength(2));
+ assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
+ fileSplit = (CombineFileSplit) splits.get(1);
+ assertEquals(2, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file2.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(BLOCKSIZE, fileSplit.getLength(0));
+ assertEquals(file2.getName(), fileSplit.getPath(1).getName());
+ assertEquals(BLOCKSIZE, fileSplit.getOffset(1));
+ assertEquals(BLOCKSIZE, fileSplit.getLength(1));
+ assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
+ fileSplit = (CombineFileSplit) splits.get(2);
+ assertEquals(1, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(BLOCKSIZE, fileSplit.getLength(0));
+ assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
// maximum split size is 2 blocks
inFormat = new DummyInputFormat();
@@ -462,35 +462,35 @@ public class TestCombineFileInputFormat
for (InputSplit split : splits) {
System.out.println("File split(Test4): " + split);
}
- assertEquals(splits.size(), 5);
+ assertEquals(5, splits.size());
fileSplit = (CombineFileSplit) splits.get(0);
- assertEquals(fileSplit.getNumPaths(), 2);
- assertEquals(fileSplit.getLocations().length, 1);
- assertEquals(fileSplit.getPath(0).getName(), file3.getName());
- assertEquals(fileSplit.getOffset(0), 0);
- assertEquals(fileSplit.getLength(0), BLOCKSIZE);
- assertEquals(fileSplit.getPath(1).getName(), file3.getName());
- assertEquals(fileSplit.getOffset(1), BLOCKSIZE);
- assertEquals(fileSplit.getLength(1), BLOCKSIZE);
- assertEquals(fileSplit.getLocations()[0], "host3.rack3.com");
- fileSplit = (CombineFileSplit) splits.get(1);
- assertEquals(fileSplit.getPath(0).getName(), file3.getName());
- assertEquals(fileSplit.getOffset(0), 2 * BLOCKSIZE);
- assertEquals(fileSplit.getLength(0), BLOCKSIZE);
- assertEquals(fileSplit.getPath(1).getName(), file4.getName());
- assertEquals(fileSplit.getOffset(1), 0);
- assertEquals(fileSplit.getLength(1), BLOCKSIZE);
- assertEquals(fileSplit.getLocations()[0], "host3.rack3.com");
- fileSplit = (CombineFileSplit) splits.get(2);
- assertEquals(fileSplit.getNumPaths(), 2);
- assertEquals(fileSplit.getLocations().length, 1);
- assertEquals(fileSplit.getPath(0).getName(), file4.getName());
- assertEquals(fileSplit.getOffset(0), BLOCKSIZE);
- assertEquals(fileSplit.getLength(0), BLOCKSIZE);
- assertEquals(fileSplit.getPath(1).getName(), file4.getName());
- assertEquals(fileSplit.getOffset(1), 2 * BLOCKSIZE);
- assertEquals(fileSplit.getLength(1), BLOCKSIZE);
- assertEquals(fileSplit.getLocations()[0], "host3.rack3.com");
+ assertEquals(2, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file3.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(BLOCKSIZE, fileSplit.getLength(0));
+ assertEquals(file3.getName(), fileSplit.getPath(1).getName());
+ assertEquals(BLOCKSIZE, fileSplit.getOffset(1));
+ assertEquals(BLOCKSIZE, fileSplit.getLength(1));
+ assertEquals("host3.rack3.com", fileSplit.getLocations()[0]);
+ fileSplit = (CombineFileSplit) splits.get(1);
+ assertEquals(file3.getName(), fileSplit.getPath(0).getName());
+ assertEquals(2 * BLOCKSIZE, fileSplit.getOffset(0));
+ assertEquals(BLOCKSIZE, fileSplit.getLength(0));
+ assertEquals(file4.getName(), fileSplit.getPath(1).getName());
+ assertEquals(0, fileSplit.getOffset(1));
+ assertEquals(BLOCKSIZE, fileSplit.getLength(1));
+ assertEquals("host3.rack3.com", fileSplit.getLocations()[0]);
+ fileSplit = (CombineFileSplit) splits.get(2);
+ assertEquals(2, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file4.getName(), fileSplit.getPath(0).getName());
+ assertEquals(BLOCKSIZE, fileSplit.getOffset(0));
+ assertEquals(BLOCKSIZE, fileSplit.getLength(0));
+ assertEquals(file4.getName(), fileSplit.getPath(1).getName());
+ assertEquals(2 * BLOCKSIZE, fileSplit.getOffset(1));
+ assertEquals(BLOCKSIZE, fileSplit.getLength(1));
+ assertEquals("host3.rack3.com", fileSplit.getLocations()[0]);
// maximum split size is 3 blocks
inFormat = new DummyInputFormat();
@@ -502,48 +502,48 @@ public class TestCombineFileInputFormat
for (InputSplit split : splits) {
System.out.println("File split(Test5): " + split);
}
- assertEquals(splits.size(), 4);
+ assertEquals(4, splits.size());
fileSplit = (CombineFileSplit) splits.get(0);
- assertEquals(fileSplit.getNumPaths(), 3);
- assertEquals(fileSplit.getLocations().length, 1);
- assertEquals(fileSplit.getPath(0).getName(), file3.getName());
- assertEquals(fileSplit.getOffset(0), 0);
- assertEquals(fileSplit.getLength(0), BLOCKSIZE);
- assertEquals(fileSplit.getPath(1).getName(), file3.getName());
- assertEquals(fileSplit.getOffset(1), BLOCKSIZE);
- assertEquals(fileSplit.getLength(1), BLOCKSIZE);
- assertEquals(fileSplit.getPath(2).getName(), file3.getName());
- assertEquals(fileSplit.getOffset(2), 2 * BLOCKSIZE);
- assertEquals(fileSplit.getLength(2), BLOCKSIZE);
- assertEquals(fileSplit.getLocations()[0], "host3.rack3.com");
- fileSplit = (CombineFileSplit) splits.get(1);
- assertEquals(fileSplit.getPath(0).getName(), file4.getName());
- assertEquals(fileSplit.getOffset(0), 0);
- assertEquals(fileSplit.getLength(0), BLOCKSIZE);
- assertEquals(fileSplit.getPath(1).getName(), file4.getName());
- assertEquals(fileSplit.getOffset(1), BLOCKSIZE);
- assertEquals(fileSplit.getLength(1), BLOCKSIZE);
- assertEquals(fileSplit.getPath(2).getName(), file4.getName());
- assertEquals(fileSplit.getOffset(2), 2 * BLOCKSIZE);
- assertEquals(fileSplit.getLength(2), BLOCKSIZE);
- assertEquals(fileSplit.getLocations()[0], "host3.rack3.com");
- fileSplit = (CombineFileSplit) splits.get(2);
- assertEquals(fileSplit.getNumPaths(), 2);
- assertEquals(fileSplit.getLocations().length, 1);
- assertEquals(fileSplit.getPath(0).getName(), file2.getName());
- assertEquals(fileSplit.getOffset(0), 0);
- assertEquals(fileSplit.getLength(0), BLOCKSIZE);
- assertEquals(fileSplit.getPath(1).getName(), file2.getName());
- assertEquals(fileSplit.getOffset(1), BLOCKSIZE);
- assertEquals(fileSplit.getLength(1), BLOCKSIZE);
- assertEquals(fileSplit.getLocations()[0], "host2.rack2.com");
+ assertEquals(3, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file3.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(BLOCKSIZE, fileSplit.getLength(0));
+ assertEquals(file3.getName(), fileSplit.getPath(1).getName());
+ assertEquals(BLOCKSIZE, fileSplit.getOffset(1));
+ assertEquals(BLOCKSIZE, fileSplit.getLength(1));
+ assertEquals(file3.getName(), fileSplit.getPath(2).getName());
+ assertEquals(2 * BLOCKSIZE, fileSplit.getOffset(2));
+ assertEquals(BLOCKSIZE, fileSplit.getLength(2));
+ assertEquals("host3.rack3.com", fileSplit.getLocations()[0]);
+ fileSplit = (CombineFileSplit) splits.get(1);
+ assertEquals(file4.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(BLOCKSIZE, fileSplit.getLength(0));
+ assertEquals(file4.getName(), fileSplit.getPath(1).getName());
+ assertEquals(BLOCKSIZE, fileSplit.getOffset(1));
+ assertEquals(BLOCKSIZE, fileSplit.getLength(1));
+ assertEquals(file4.getName(), fileSplit.getPath(2).getName());
+ assertEquals( 2 * BLOCKSIZE, fileSplit.getOffset(2));
+ assertEquals(BLOCKSIZE, fileSplit.getLength(2));
+ assertEquals("host3.rack3.com", fileSplit.getLocations()[0]);
+ fileSplit = (CombineFileSplit) splits.get(2);
+ assertEquals(2, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file2.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(BLOCKSIZE, fileSplit.getLength(0));
+ assertEquals(file2.getName(), fileSplit.getPath(1).getName());
+ assertEquals(BLOCKSIZE, fileSplit.getOffset(1));
+ assertEquals(BLOCKSIZE, fileSplit.getLength(1));
+ assertEquals("host2.rack2.com", fileSplit.getLocations()[0]);
fileSplit = (CombineFileSplit) splits.get(3);
- assertEquals(fileSplit.getNumPaths(), 1);
- assertEquals(fileSplit.getLocations().length, 1);
- assertEquals(fileSplit.getPath(0).getName(), file1.getName());
- assertEquals(fileSplit.getOffset(0), 0);
- assertEquals(fileSplit.getLength(0), BLOCKSIZE);
- assertEquals(fileSplit.getLocations()[0], "host1.rack1.com");
+ assertEquals(1, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(BLOCKSIZE, fileSplit.getLength(0));
+ assertEquals("host1.rack1.com", fileSplit.getLocations()[0]);
// maximum split size is 4 blocks
inFormat = new DummyInputFormat();
@@ -553,42 +553,42 @@ public class TestCombineFileInputFormat
for (InputSplit split : splits) {
System.out.println("File split(Test6): " + split);
}
- assertEquals(splits.size(), 3);
+ assertEquals(3, splits.size());
fileSplit = (CombineFileSplit) splits.get(0);
- assertEquals(fileSplit.getNumPaths(), 4);
- assertEquals(fileSplit.getLocations().length, 1);
- assertEquals(fileSplit.getPath(0).getName(), file3.getName());
- assertEquals(fileSplit.getOffset(0), 0);
- assertEquals(fileSplit.getLength(0), BLOCKSIZE);
- assertEquals(fileSplit.getPath(1).getName(), file3.getName());
- assertEquals(fileSplit.getOffset(1), BLOCKSIZE);
- assertEquals(fileSplit.getLength(1), BLOCKSIZE);
- assertEquals(fileSplit.getPath(2).getName(), file3.getName());
- assertEquals(fileSplit.getOffset(2), 2 * BLOCKSIZE);
- assertEquals(fileSplit.getLength(2), BLOCKSIZE);
- assertEquals(fileSplit.getLocations()[0], "host3.rack3.com");
- fileSplit = (CombineFileSplit) splits.get(1);
- assertEquals(fileSplit.getNumPaths(), 4);
- assertEquals(fileSplit.getPath(0).getName(), file2.getName());
- assertEquals(fileSplit.getOffset(0), 0);
- assertEquals(fileSplit.getLength(0), BLOCKSIZE);
- assertEquals(fileSplit.getPath(1).getName(), file2.getName());
- assertEquals(fileSplit.getOffset(1), BLOCKSIZE);
- assertEquals(fileSplit.getLength(1), BLOCKSIZE);
- assertEquals(fileSplit.getPath(2).getName(), file4.getName());
- assertEquals(fileSplit.getOffset(2), BLOCKSIZE);
- assertEquals(fileSplit.getLength(2), BLOCKSIZE);
- assertEquals(fileSplit.getPath(3).getName(), file4.getName());
- assertEquals(fileSplit.getOffset(3), 2 * BLOCKSIZE);
- assertEquals(fileSplit.getLength(3), BLOCKSIZE);
- assertEquals(fileSplit.getLocations()[0], "host2.rack2.com");
- fileSplit = (CombineFileSplit) splits.get(2);
- assertEquals(fileSplit.getNumPaths(), 1);
- assertEquals(fileSplit.getLocations().length, 1);
- assertEquals(fileSplit.getPath(0).getName(), file1.getName());
- assertEquals(fileSplit.getOffset(0), 0);
- assertEquals(fileSplit.getLength(0), BLOCKSIZE);
- assertEquals(fileSplit.getLocations()[0], hosts1[0]); // should be on r1
+ assertEquals(4, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file3.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(BLOCKSIZE, fileSplit.getLength(0));
+ assertEquals(file3.getName(), fileSplit.getPath(1).getName());
+ assertEquals(BLOCKSIZE, fileSplit.getOffset(1));
+ assertEquals(BLOCKSIZE, fileSplit.getLength(1));
+ assertEquals(file3.getName(), fileSplit.getPath(2).getName());
+ assertEquals(2 * BLOCKSIZE, fileSplit.getOffset(2));
+ assertEquals(BLOCKSIZE, fileSplit.getLength(2));
+ assertEquals("host3.rack3.com", fileSplit.getLocations()[0]);
+ fileSplit = (CombineFileSplit) splits.get(1);
+ assertEquals(4, fileSplit.getNumPaths());
+ assertEquals(file2.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(BLOCKSIZE, fileSplit.getLength(0));
+ assertEquals(file2.getName(), fileSplit.getPath(1).getName());
+ assertEquals(BLOCKSIZE, fileSplit.getOffset(1));
+ assertEquals(BLOCKSIZE, fileSplit.getLength(1));
+ assertEquals(file4.getName(), fileSplit.getPath(2).getName());
+ assertEquals(BLOCKSIZE, fileSplit.getOffset(2));
+ assertEquals(BLOCKSIZE, fileSplit.getLength(2));
+ assertEquals(file4.getName(), fileSplit.getPath(3).getName());
+ assertEquals( 2 * BLOCKSIZE, fileSplit.getOffset(3));
+ assertEquals(BLOCKSIZE, fileSplit.getLength(3));
+ assertEquals("host2.rack2.com", fileSplit.getLocations()[0]);
+ fileSplit = (CombineFileSplit) splits.get(2);
+ assertEquals(1, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(BLOCKSIZE, fileSplit.getLength(0));
+ assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
// maximum split size is 7 blocks and min is 3 blocks
inFormat = new DummyInputFormat();
@@ -601,15 +601,15 @@ public class TestCombineFileInputFormat
for (InputSplit split : splits) {
System.out.println("File split(Test7): " + split);
}
- assertEquals(splits.size(), 2);
+ assertEquals(2, splits.size());
fileSplit = (CombineFileSplit) splits.get(0);
- assertEquals(fileSplit.getNumPaths(), 6);
- assertEquals(fileSplit.getLocations().length, 1);
- assertEquals(fileSplit.getLocations()[0], "host3.rack3.com");
- fileSplit = (CombineFileSplit) splits.get(1);
- assertEquals(fileSplit.getNumPaths(), 3);
- assertEquals(fileSplit.getLocations().length, 1);
- assertEquals(fileSplit.getLocations()[0], "host1.rack1.com");
+ assertEquals(6, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals("host3.rack3.com", fileSplit.getLocations()[0]);
+ fileSplit = (CombineFileSplit) splits.get(1);
+ assertEquals(3, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals("host1.rack1.com", fileSplit.getLocations()[0]);
// Rack 1 has file1, file2 and file3 and file4
// Rack 2 has file2 and file3 and file4
@@ -624,19 +624,19 @@ public class TestCombineFileInputFormat
for (InputSplit split : splits) {
System.out.println("File split(Test1): " + split);
}
- assertEquals(splits.size(), 3);
+ assertEquals(3, splits.size());
fileSplit = (CombineFileSplit) splits.get(0);
- assertEquals(fileSplit.getNumPaths(), 2);
- assertEquals(fileSplit.getLocations().length, 1);
- assertEquals(fileSplit.getLocations()[0], hosts2[0]); // should be on r2
- fileSplit = (CombineFileSplit) splits.get(1);
- assertEquals(fileSplit.getNumPaths(), 1);
- assertEquals(fileSplit.getLocations().length, 1);
- assertEquals(fileSplit.getLocations()[0], hosts1[0]); // should be on r1
- fileSplit = (CombineFileSplit) splits.get(2);
- assertEquals(fileSplit.getNumPaths(), 6);
- assertEquals(fileSplit.getLocations().length, 1);
- assertEquals(fileSplit.getLocations()[0], hosts3[0]); // should be on r3
+ assertEquals(2, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
+ fileSplit = (CombineFileSplit) splits.get(1);
+ assertEquals(1, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
+ fileSplit = (CombineFileSplit) splits.get(2);
+ assertEquals(6, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
// measure performance when there are multiple pools and
// many files in each pool.
@@ -669,7 +669,7 @@ public class TestCombineFileInputFormat
for (InputSplit split : splits) {
System.out.println("File split(Test8): " + split);
}
- assertEquals(6, splits.size());
+ assertEquals(splits.size(), 6);
} finally {
if (dfs != null) {
@@ -750,7 +750,7 @@ public class TestCombineFileInputFormat
for (InputSplit split : splits) {
System.out.println("File split(Test0): " + split);
}
- assertEquals(splits.size(), 1);
+ assertEquals(1, splits.size());
CombineFileSplit fileSplit = (CombineFileSplit) splits.get(0);
assertEquals(2, fileSplit.getNumPaths());
assertEquals(1, fileSplit.getLocations().length);
@@ -1135,7 +1135,7 @@ public class TestCombineFileInputFormat
Job job = Job.getInstance(conf);
FileInputFormat.setInputPaths(job, "test");
List<InputSplit> splits = inFormat.getSplits(job);
- assertEquals(splits.size(), 1);
+ assertEquals(1, splits.size());
CombineFileSplit fileSplit = (CombineFileSplit) splits.get(0);
assertEquals(1, fileSplit.getNumPaths());
assertEquals(file.getName(), fileSplit.getPath(0).getName());
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/WordMean.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/WordMean.java?rev=1401071&r1=1401070&r2=1401071&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/WordMean.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/WordMean.java Mon Oct 22 20:43:16 2012
@@ -1,196 +1,196 @@
-package org.apache.hadoop.examples;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.StringTokenizer;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-
-public class WordMean extends Configured implements Tool {
-
- private double mean = 0;
-
- private final static Text COUNT = new Text("count");
- private final static Text LENGTH = new Text("length");
- private final static LongWritable ONE = new LongWritable(1);
-
- /**
- * Maps words from line of text into 2 key-value pairs; one key-value pair for
- * counting the word, another for counting its length.
- */
- public static class WordMeanMapper extends
- Mapper<Object, Text, Text, LongWritable> {
-
- private LongWritable wordLen = new LongWritable();
-
- /**
- * Emits 2 key-value pairs for counting the word and its length. Outputs are
- * (Text, LongWritable).
- *
- * @param value
- * This will be a line of text coming in from our input file.
- */
- public void map(Object key, Text value, Context context)
- throws IOException, InterruptedException {
- StringTokenizer itr = new StringTokenizer(value.toString());
- while (itr.hasMoreTokens()) {
- String string = itr.nextToken();
- this.wordLen.set(string.length());
- context.write(LENGTH, this.wordLen);
- context.write(COUNT, ONE);
- }
- }
- }
-
- /**
- * Performs integer summation of all the values for each key.
- */
- public static class WordMeanReducer extends
- Reducer<Text, LongWritable, Text, LongWritable> {
-
- private LongWritable sum = new LongWritable();
-
- /**
- * Sums all the individual values within the iterator and writes them to the
- * same key.
- *
- * @param key
- * This will be one of 2 constants: LENGTH_STR or COUNT_STR.
- * @param values
- * This will be an iterator of all the values associated with that
- * key.
- */
- public void reduce(Text key, Iterable<LongWritable> values, Context context)
- throws IOException, InterruptedException {
-
- int theSum = 0;
- for (LongWritable val : values) {
- theSum += val.get();
- }
- sum.set(theSum);
- context.write(key, sum);
- }
- }
-
- /**
- * Reads the output file and parses the summation of lengths, and the word
- * count, to perform a quick calculation of the mean.
- *
- * @param path
- * The path to find the output file in. Set in main to the output
- * directory.
- * @throws IOException
- * If it cannot access the output directory, we throw an exception.
- */
- private double readAndCalcMean(Path path, Configuration conf)
- throws IOException {
- FileSystem fs = FileSystem.get(conf);
- Path file = new Path(path, "part-r-00000");
-
- if (!fs.exists(file))
- throw new IOException("Output not found!");
-
- BufferedReader br = null;
-
- // average = total sum / number of elements;
- try {
- br = new BufferedReader(new InputStreamReader(fs.open(file)));
-
- long count = 0;
- long length = 0;
-
- String line;
- while ((line = br.readLine()) != null) {
- StringTokenizer st = new StringTokenizer(line);
-
- // grab type
- String type = st.nextToken();
-
- // differentiate
- if (type.equals(COUNT.toString())) {
- String countLit = st.nextToken();
- count = Long.parseLong(countLit);
- } else if (type.equals(LENGTH.toString())) {
- String lengthLit = st.nextToken();
- length = Long.parseLong(lengthLit);
- }
- }
-
- double theMean = (((double) length) / ((double) count));
- System.out.println("The mean is: " + theMean);
- return theMean;
- } finally {
- br.close();
- }
- }
-
- public static void main(String[] args) throws Exception {
- ToolRunner.run(new Configuration(), new WordMean(), args);
- }
-
- @Override
- public int run(String[] args) throws Exception {
- if (args.length != 2) {
- System.err.println("Usage: wordmean <in> <out>");
- return 0;
- }
-
- Configuration conf = getConf();
-
- @SuppressWarnings("deprecation")
- Job job = new Job(conf, "word mean");
- job.setJarByClass(WordMean.class);
- job.setMapperClass(WordMeanMapper.class);
- job.setCombinerClass(WordMeanReducer.class);
- job.setReducerClass(WordMeanReducer.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(LongWritable.class);
- FileInputFormat.addInputPath(job, new Path(args[0]));
- Path outputpath = new Path(args[1]);
- FileOutputFormat.setOutputPath(job, outputpath);
- boolean result = job.waitForCompletion(true);
- mean = readAndCalcMean(outputpath, conf);
-
- return (result ? 0 : 1);
- }
-
- /**
- * Only valuable after run() called.
- *
- * @return Returns the mean value.
- */
- public double getMean() {
- return mean;
- }
+package org.apache.hadoop.examples;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+public class WordMean extends Configured implements Tool {
+
+ private double mean = 0;
+
+ private final static Text COUNT = new Text("count");
+ private final static Text LENGTH = new Text("length");
+ private final static LongWritable ONE = new LongWritable(1);
+
+ /**
+ * Maps words from line of text into 2 key-value pairs; one key-value pair for
+ * counting the word, another for counting its length.
+ */
+ public static class WordMeanMapper extends
+ Mapper<Object, Text, Text, LongWritable> {
+
+ private LongWritable wordLen = new LongWritable();
+
+ /**
+ * Emits 2 key-value pairs for counting the word and its length. Outputs are
+ * (Text, LongWritable).
+ *
+ * @param value
+ * This will be a line of text coming in from our input file.
+ */
+ public void map(Object key, Text value, Context context)
+ throws IOException, InterruptedException {
+ StringTokenizer itr = new StringTokenizer(value.toString());
+ while (itr.hasMoreTokens()) {
+ String string = itr.nextToken();
+ this.wordLen.set(string.length());
+ context.write(LENGTH, this.wordLen);
+ context.write(COUNT, ONE);
+ }
+ }
+ }
+
+ /**
+ * Performs integer summation of all the values for each key.
+ */
+ public static class WordMeanReducer extends
+ Reducer<Text, LongWritable, Text, LongWritable> {
+
+ private LongWritable sum = new LongWritable();
+
+ /**
+ * Sums all the individual values within the iterator and writes them to the
+ * same key.
+ *
+ * @param key
+ * This will be one of 2 constants: LENGTH_STR or COUNT_STR.
+ * @param values
+ * This will be an iterator of all the values associated with that
+ * key.
+ */
+ public void reduce(Text key, Iterable<LongWritable> values, Context context)
+ throws IOException, InterruptedException {
+
+ int theSum = 0;
+ for (LongWritable val : values) {
+ theSum += val.get();
+ }
+ sum.set(theSum);
+ context.write(key, sum);
+ }
+ }
+
+ /**
+ * Reads the output file and parses the summation of lengths, and the word
+ * count, to perform a quick calculation of the mean.
+ *
+ * @param path
+ * The path to find the output file in. Set in main to the output
+ * directory.
+ * @throws IOException
+ * If it cannot access the output directory, we throw an exception.
+ */
+ private double readAndCalcMean(Path path, Configuration conf)
+ throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+ Path file = new Path(path, "part-r-00000");
+
+ if (!fs.exists(file))
+ throw new IOException("Output not found!");
+
+ BufferedReader br = null;
+
+ // average = total sum / number of elements;
+ try {
+ br = new BufferedReader(new InputStreamReader(fs.open(file)));
+
+ long count = 0;
+ long length = 0;
+
+ String line;
+ while ((line = br.readLine()) != null) {
+ StringTokenizer st = new StringTokenizer(line);
+
+ // grab type
+ String type = st.nextToken();
+
+ // differentiate
+ if (type.equals(COUNT.toString())) {
+ String countLit = st.nextToken();
+ count = Long.parseLong(countLit);
+ } else if (type.equals(LENGTH.toString())) {
+ String lengthLit = st.nextToken();
+ length = Long.parseLong(lengthLit);
+ }
+ }
+
+ double theMean = (((double) length) / ((double) count));
+ System.out.println("The mean is: " + theMean);
+ return theMean;
+ } finally {
+ br.close();
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ ToolRunner.run(new Configuration(), new WordMean(), args);
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ if (args.length != 2) {
+ System.err.println("Usage: wordmean <in> <out>");
+ return 0;
+ }
+
+ Configuration conf = getConf();
+
+ @SuppressWarnings("deprecation")
+ Job job = new Job(conf, "word mean");
+ job.setJarByClass(WordMean.class);
+ job.setMapperClass(WordMeanMapper.class);
+ job.setCombinerClass(WordMeanReducer.class);
+ job.setReducerClass(WordMeanReducer.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(LongWritable.class);
+ FileInputFormat.addInputPath(job, new Path(args[0]));
+ Path outputpath = new Path(args[1]);
+ FileOutputFormat.setOutputPath(job, outputpath);
+ boolean result = job.waitForCompletion(true);
+ mean = readAndCalcMean(outputpath, conf);
+
+ return (result ? 0 : 1);
+ }
+
+ /**
+ * Only valuable after run() called.
+ *
+ * @return Returns the mean value.
+ */
+ public double getMean() {
+ return mean;
+ }
}
\ No newline at end of file
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/WordMedian.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/WordMedian.java?rev=1401071&r1=1401070&r2=1401071&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/WordMedian.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/WordMedian.java Mon Oct 22 20:43:16 2012
@@ -1,208 +1,208 @@
-package org.apache.hadoop.examples;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.StringTokenizer;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.TaskCounter;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-
-public class WordMedian extends Configured implements Tool {
-
- private double median = 0;
- private final static IntWritable ONE = new IntWritable(1);
-
- /**
- * Maps words from line of text into a key-value pair; the length of the word
- * as the key, and 1 as the value.
- */
- public static class WordMedianMapper extends
- Mapper<Object, Text, IntWritable, IntWritable> {
-
- private IntWritable length = new IntWritable();
-
- /**
- * Emits a key-value pair for counting the word. Outputs are (IntWritable,
- * IntWritable).
- *
- * @param value
- * This will be a line of text coming in from our input file.
- */
- public void map(Object key, Text value, Context context)
- throws IOException, InterruptedException {
- StringTokenizer itr = new StringTokenizer(value.toString());
- while (itr.hasMoreTokens()) {
- String string = itr.nextToken();
- length.set(string.length());
- context.write(length, ONE);
- }
- }
- }
-
- /**
- * Performs integer summation of all the values for each key.
- */
- public static class WordMedianReducer extends
- Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
-
- private IntWritable val = new IntWritable();
-
- /**
- * Sums all the individual values within the iterator and writes them to the
- * same key.
- *
- * @param key
- * This will be a length of a word that was read.
- * @param values
- * This will be an iterator of all the values associated with that
- * key.
- */
- public void reduce(IntWritable key, Iterable<IntWritable> values,
- Context context) throws IOException, InterruptedException {
-
- int sum = 0;
- for (IntWritable value : values) {
- sum += value.get();
- }
- val.set(sum);
- context.write(key, val);
- }
- }
-
- /**
- * This is a standard program to read and find a median value based on a file
- * of word counts such as: 1 456, 2 132, 3 56... Where the first values are
- * the word lengths and the following values are the number of times that
- * words of that length appear.
- *
- * @param path
- * The path to read the HDFS file from (part-r-00000...00001...etc).
- * @param medianIndex1
- * The first length value to look for.
- * @param medianIndex2
- * The second length value to look for (will be the same as the first
- * if there are an even number of words total).
- * @throws IOException
- * If file cannot be found, we throw an exception.
- * */
- private double readAndFindMedian(String path, int medianIndex1,
- int medianIndex2, Configuration conf) throws IOException {
- FileSystem fs = FileSystem.get(conf);
- Path file = new Path(path, "part-r-00000");
-
- if (!fs.exists(file))
- throw new IOException("Output not found!");
-
- BufferedReader br = null;
-
- try {
- br = new BufferedReader(new InputStreamReader(fs.open(file)));
- int num = 0;
-
- String line;
- while ((line = br.readLine()) != null) {
- StringTokenizer st = new StringTokenizer(line);
-
- // grab length
- String currLen = st.nextToken();
-
- // grab count
- String lengthFreq = st.nextToken();
-
- int prevNum = num;
- num += Integer.parseInt(lengthFreq);
-
- if (medianIndex2 >= prevNum && medianIndex1 <= num) {
- System.out.println("The median is: " + currLen);
- br.close();
- return Double.parseDouble(currLen);
- } else if (medianIndex2 >= prevNum && medianIndex1 < num) {
- String nextCurrLen = st.nextToken();
- double theMedian = (Integer.parseInt(currLen) + Integer
- .parseInt(nextCurrLen)) / 2.0;
- System.out.println("The median is: " + theMedian);
- br.close();
- return theMedian;
- }
- }
- } finally {
- br.close();
- }
- // error, no median found
- return -1;
- }
-
- public static void main(String[] args) throws Exception {
- ToolRunner.run(new Configuration(), new WordMedian(), args);
- }
-
- @Override
- public int run(String[] args) throws Exception {
- if (args.length != 2) {
- System.err.println("Usage: wordmedian <in> <out>");
- return 0;
- }
-
- setConf(new Configuration());
- Configuration conf = getConf();
-
- @SuppressWarnings("deprecation")
- Job job = new Job(conf, "word median");
- job.setJarByClass(WordMedian.class);
- job.setMapperClass(WordMedianMapper.class);
- job.setCombinerClass(WordMedianReducer.class);
- job.setReducerClass(WordMedianReducer.class);
- job.setOutputKeyClass(IntWritable.class);
- job.setOutputValueClass(IntWritable.class);
- FileInputFormat.addInputPath(job, new Path(args[0]));
- FileOutputFormat.setOutputPath(job, new Path(args[1]));
- boolean result = job.waitForCompletion(true);
-
- // Wait for JOB 1 -- get middle value to check for Median
-
- long totalWords = job.getCounters()
- .getGroup(TaskCounter.class.getCanonicalName())
- .findCounter("MAP_OUTPUT_RECORDS", "Map output records").getValue();
- int medianIndex1 = (int) Math.ceil((totalWords / 2.0));
- int medianIndex2 = (int) Math.floor((totalWords / 2.0));
-
- median = readAndFindMedian(args[1], medianIndex1, medianIndex2, conf);
-
- return (result ? 0 : 1);
- }
-
- public double getMedian() {
- return median;
- }
-}
+package org.apache.hadoop.examples;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskCounter;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+public class WordMedian extends Configured implements Tool {
+
+ private double median = 0;
+ private final static IntWritable ONE = new IntWritable(1);
+
+ /**
+ * Maps words from line of text into a key-value pair; the length of the word
+ * as the key, and 1 as the value.
+ */
+ public static class WordMedianMapper extends
+ Mapper<Object, Text, IntWritable, IntWritable> {
+
+ private IntWritable length = new IntWritable();
+
+ /**
+ * Emits a key-value pair for counting the word. Outputs are (IntWritable,
+ * IntWritable).
+ *
+ * @param value
+ * This will be a line of text coming in from our input file.
+ */
+ public void map(Object key, Text value, Context context)
+ throws IOException, InterruptedException {
+ StringTokenizer itr = new StringTokenizer(value.toString());
+ while (itr.hasMoreTokens()) {
+ String string = itr.nextToken();
+ length.set(string.length());
+ context.write(length, ONE);
+ }
+ }
+ }
+
+ /**
+ * Performs integer summation of all the values for each key.
+ */
+ public static class WordMedianReducer extends
+ Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
+
+ private IntWritable val = new IntWritable();
+
+ /**
+ * Sums all the individual values within the iterator and writes them to the
+ * same key.
+ *
+ * @param key
+ * This will be a length of a word that was read.
+ * @param values
+ * This will be an iterator of all the values associated with that
+ * key.
+ */
+ public void reduce(IntWritable key, Iterable<IntWritable> values,
+ Context context) throws IOException, InterruptedException {
+
+ int sum = 0;
+ for (IntWritable value : values) {
+ sum += value.get();
+ }
+ val.set(sum);
+ context.write(key, val);
+ }
+ }
+
+ /**
+ * This is a standard program to read and find a median value based on a file
+ * of word counts such as: 1 456, 2 132, 3 56... Where the first values are
+ * the word lengths and the following values are the number of times that
+ * words of that length appear.
+ *
+ * @param path
+ * The path to read the HDFS file from (part-r-00000...00001...etc).
+ * @param medianIndex1
+ * The first length value to look for.
+ * @param medianIndex2
+ * The second length value to look for (will be the same as the first
+ * if there are an even number of words total).
+ * @throws IOException
+ * If file cannot be found, we throw an exception.
+ * */
+ private double readAndFindMedian(String path, int medianIndex1,
+ int medianIndex2, Configuration conf) throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+ Path file = new Path(path, "part-r-00000");
+
+ if (!fs.exists(file))
+ throw new IOException("Output not found!");
+
+ BufferedReader br = null;
+
+ try {
+ br = new BufferedReader(new InputStreamReader(fs.open(file)));
+ int num = 0;
+
+ String line;
+ while ((line = br.readLine()) != null) {
+ StringTokenizer st = new StringTokenizer(line);
+
+ // grab length
+ String currLen = st.nextToken();
+
+ // grab count
+ String lengthFreq = st.nextToken();
+
+ int prevNum = num;
+ num += Integer.parseInt(lengthFreq);
+
+ if (medianIndex2 >= prevNum && medianIndex1 <= num) {
+ System.out.println("The median is: " + currLen);
+ br.close();
+ return Double.parseDouble(currLen);
+ } else if (medianIndex2 >= prevNum && medianIndex1 < num) {
+ String nextCurrLen = st.nextToken();
+ double theMedian = (Integer.parseInt(currLen) + Integer
+ .parseInt(nextCurrLen)) / 2.0;
+ System.out.println("The median is: " + theMedian);
+ br.close();
+ return theMedian;
+ }
+ }
+ } finally {
+ br.close();
+ }
+ // error, no median found
+ return -1;
+ }
+
+ public static void main(String[] args) throws Exception {
+ ToolRunner.run(new Configuration(), new WordMedian(), args);
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ if (args.length != 2) {
+ System.err.println("Usage: wordmedian <in> <out>");
+ return 0;
+ }
+
+ setConf(new Configuration());
+ Configuration conf = getConf();
+
+ @SuppressWarnings("deprecation")
+ Job job = new Job(conf, "word median");
+ job.setJarByClass(WordMedian.class);
+ job.setMapperClass(WordMedianMapper.class);
+ job.setCombinerClass(WordMedianReducer.class);
+ job.setReducerClass(WordMedianReducer.class);
+ job.setOutputKeyClass(IntWritable.class);
+ job.setOutputValueClass(IntWritable.class);
+ FileInputFormat.addInputPath(job, new Path(args[0]));
+ FileOutputFormat.setOutputPath(job, new Path(args[1]));
+ boolean result = job.waitForCompletion(true);
+
+ // Wait for JOB 1 -- get middle value to check for Median
+
+ long totalWords = job.getCounters()
+ .getGroup(TaskCounter.class.getCanonicalName())
+ .findCounter("MAP_OUTPUT_RECORDS", "Map output records").getValue();
+ int medianIndex1 = (int) Math.ceil((totalWords / 2.0));
+ int medianIndex2 = (int) Math.floor((totalWords / 2.0));
+
+ median = readAndFindMedian(args[1], medianIndex1, medianIndex2, conf);
+
+ return (result ? 0 : 1);
+ }
+
+ public double getMedian() {
+ return median;
+ }
+}